leonardBang commented on code in PR #23079:
URL: https://github.com/apache/flink/pull/23079#discussion_r1286081508


##########
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/gpu/MatrixVectorMul.java:
##########
@@ -95,9 +98,22 @@ public static void main(String[] args) throws Exception {
         final int dataSize = params.getInt("data-size", DEFAULT_DATA_SIZE);
         final String resourceName = params.get("resource-name", 
DEFAULT_RESOURCE_NAME);
 
-        DataStream<List<Float>> result =
-                env.addSource(new RandomVectorSource(dimension, dataSize))
-                        .map(new Multiplier(dimension, resourceName));
+        GeneratorFunction<Long, List<Float>> generatorFunction =
+                index -> {
+                    List<Float> randomRecord = new ArrayList<>();
+                    for (int i = 0; i < dimension; ++i) {
+                        randomRecord.add((float) Math.random());
+                    }
+                    return randomRecord;
+                };
+
+        DataGeneratorSource<List<Float>> generatorSource =
+                new DataGeneratorSource<>(generatorFunction, dataSize, 
Types.LIST(Types.FLOAT));
+
+        DataStreamSource<List<Float>> result =
+                env.fromSource(generatorSource, 
WatermarkStrategy.noWatermarks(), "Vectors Source");
+
+        result.print();

Review Comment:
   redundant call 



##########
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java:
##########
@@ -62,7 +66,7 @@ public static void main(String[] args) throws Exception {
         // ---- print some usage help ----
 
         System.out.println(
-                "Usage with built-in data generator: StateMachineExample 
[--error-rate <probability-of-invalid-transition>] [--sleep 
<sleep-per-record-in-ms>]");
+                "Usage with built-in data generator: StateMachineExample 
[--error-rate <probability-of-invalid-transition>] [--sleep 
<sleep-per-record-in-ms> | --rps <records-per-second>]");

Review Comment:
   how about remove legacy `--sleep <sleep-per-record-in-ms>` ?



##########
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java:
##########
@@ -135,7 +135,7 @@ public static void main(String[] args) throws Exception {
                             generatorFunction,
                             Long.MAX_VALUE,
                             RateLimiterStrategy.perSecond(recordsPerSecond),
-                            Types.POJO(Event.class));
+                            TypeInformation.of(Event.class));

Review Comment:
   minor: we can remove this change to above commit and thus let this commit 
clean



##########
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java:
##########
@@ -97,38 +110,34 @@ public Tuple2<Long, Long> reduce(Tuple2<Long, Long> 
value1, Tuple2<Long, Long> v
         }
     }
 
-    /** Parallel data source that serves a list of key-value pairs. */
-    private static class DataSource extends 
RichParallelSourceFunction<Tuple2<Long, Long>> {
-
-        private volatile boolean running = true;
-
-        @Override
-        public void run(SourceContext<Tuple2<Long, Long>> ctx) throws 
Exception {
+    private static class DataGeneratorFunction

Review Comment:
   Add a javadoc for class?



##########
flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/KafkaEventsGeneratorJob.java:
##########
@@ -35,20 +40,33 @@ public static void main(String[] args) throws Exception {
 
         final ParameterTool params = ParameterTool.fromArgs(args);
 
-        double errorRate = params.getDouble("error-rate", 0.0);
-        int sleep = params.getInt("sleep", 1);
-
-        String kafkaTopic = params.get("kafka-topic");
-        String brokers = params.get("brokers", "localhost:9092");
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);

Review Comment:
   Is there any limitation we need to change from default 4 to 1?



##########
flink-examples/flink-examples-streaming/pom.xml:
##########
@@ -70,14 +70,16 @@ under the License.
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-connector-datagen</artifactId>
                        <version>${project.version}</version>
-               </dependency>
+                       <!-- required by the shade plugin -->
+                       <optional>${flink.markBundledAsOptional}</optional>
+       </dependency>

Review Comment:
   minor: please keep align.



##########
flink-examples/flink-examples-streaming/pom.xml:
##########
@@ -70,14 +70,16 @@ under the License.
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-connector-datagen</artifactId>
                        <version>${project.version}</version>
-               </dependency>
+                       <!-- required by the shade plugin -->
+                       <optional>${flink.markBundledAsOptional}</optional>
+       </dependency>
 
-               <dependency>
-                       <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-shaded-jackson</artifactId>
-               </dependency>
+       <dependency>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-shaded-jackson</artifactId>
+       </dependency>
 
-               <!-- test dependencies -->
+       <!-- test dependencies -->

Review Comment:
   please revert tUnnecessary changes ?



##########
flink-examples/flink-examples-streaming/pom.xml:
##########
@@ -341,6 +343,19 @@ under the License.
                                <groupId>org.apache.maven.plugins</groupId>
                                <artifactId>maven-shade-plugin</artifactId>
                                <executions>
+                                       <execution>
+                                               <phase>package</phase>

Review Comment:
   Shading `data-gen`  connector to flink-examples-streaming is necessary and 
it makes sense to me.  minor: We can add a meaningful <id> to describe the 
shade purpose



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to