afedulov commented on code in PR #20757:
URL: https://github.com/apache/flink/pull/20757#discussion_r963648415


##########
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/GeneratorFunction.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source.lib;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+
+/**
+ * Base interface for data generator functions. Data generator functions take 
elements and transform
+ * them, element-wise. They are the core building block of the {@link 
DataGeneratorSource} that
+ * drives the data generation process by supplying "index" values of type 
Long. It makes it possible
+ * to produce specific elements at concrete positions of the generated data 
stream.
+ *
+ * <p>Example:
+ *
+ * <pre>{@code
+ * GeneratorFunction<Long, String> generatorFunction = index -> "Number: " + 
index;
+ * DataGeneratorSource<String> source =
+ *         new DataGeneratorSource<>(generatorFunction, 1000, Types.STRING);
+ * }</pre>
+ *
+ * @param <T> Type of the input elements.
+ * @param <O> Type of the returned elements.
+ */
+@Experimental
+public interface GeneratorFunction<T, O> extends Function {
+
+    /**
+     * Initialization method for the function. It is called once before the 
actual data mapping
+     * methods.
+     */
+    default void open(SourceReaderContext readerContext) throws Exception {}

Review Comment:
   The requirement to have an initialization method was raised 
[here](https://lists.apache.org/thread/f2lgo9ydmd1r4xyl8fg7rxcl87olshn2).
   In general, I see the applicability scope of the generator spanning these 
three areas:
   - Demos/PoC
   - Benchmarks
   - Internal utilities for testing (substituting the SourceFunction 
functionality)
   
   For demos, PoCs and benchmarking I could imagine how these methods could be 
useful:
   ```
   SourceReaderMetricGroup metricGroup();
   Configuration getConfiguration();
   String getLocalHostName();
   int getIndexOfSubtask();
   int currentParallelism();
   ```
   I could imagine users, for instance, emulating topics pre-partitioning by 
generating data based on the subtaskId/taskmanager. Configuration could be 
useful for controlling data generation in benchmarks without code changes etc.
   
   For internal testing utilities, the ability to communicate with the source 
coordinator  via  `sendSourceEventToCoordinator(SourceEvent sourceEvent)` at 
deterministic stream positions could be handy. 
   
   After looking around for similar implementations I had the impression that 
we are usually pretty "generous" with providing more context than could 
immediately be needed by the  use cases that we immediately envision: 
[https://github.com/apache/flink/blob/fb95798b1c301152b912c4b8ec4a737ea16d8641/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java#L75-L77](https://github.com/apache/flink/blob/fb95798b1c301152b912c4b8ec4a737ea16d8641/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkFunction.java#L75-L77)
    I mostly wanted to keep it flexible, but if you think we should chop out 
things that are not immediately used, we could introduce a wrapper around the 
`SourceReaderContext`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to