leonardBang commented on code in PR #23079: URL: https://github.com/apache/flink/pull/23079#discussion_r1286081508
########## flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/gpu/MatrixVectorMul.java: ########## @@ -95,9 +98,22 @@ public static void main(String[] args) throws Exception { final int dataSize = params.getInt("data-size", DEFAULT_DATA_SIZE); final String resourceName = params.get("resource-name", DEFAULT_RESOURCE_NAME); - DataStream<List<Float>> result = - env.addSource(new RandomVectorSource(dimension, dataSize)) - .map(new Multiplier(dimension, resourceName)); + GeneratorFunction<Long, List<Float>> generatorFunction = + index -> { + List<Float> randomRecord = new ArrayList<>(); + for (int i = 0; i < dimension; ++i) { + randomRecord.add((float) Math.random()); + } + return randomRecord; + }; + + DataGeneratorSource<List<Float>> generatorSource = + new DataGeneratorSource<>(generatorFunction, dataSize, Types.LIST(Types.FLOAT)); + + DataStreamSource<List<Float>> result = + env.fromSource(generatorSource, WatermarkStrategy.noWatermarks(), "Vectors Source"); + + result.print(); Review Comment: redundant call ########## flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java: ########## @@ -62,7 +66,7 @@ public static void main(String[] args) throws Exception { // ---- print some usage help ---- System.out.println( - "Usage with built-in data generator: StateMachineExample [--error-rate <probability-of-invalid-transition>] [--sleep <sleep-per-record-in-ms>]"); + "Usage with built-in data generator: StateMachineExample [--error-rate <probability-of-invalid-transition>] [--sleep <sleep-per-record-in-ms> | --rps <records-per-second>]"); Review Comment: how about remove legacy `--sleep <sleep-per-record-in-ms>` ? ########## flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java: ########## @@ -135,7 +135,7 @@ public static void main(String[] args) throws Exception { generatorFunction, Long.MAX_VALUE, RateLimiterStrategy.perSecond(recordsPerSecond), - Types.POJO(Event.class)); + TypeInformation.of(Event.class)); Review Comment: minor: we can remove this change to above commit and thus let this commit clean ########## flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java: ########## @@ -97,38 +110,34 @@ public Tuple2<Long, Long> reduce(Tuple2<Long, Long> value1, Tuple2<Long, Long> v } } - /** Parallel data source that serves a list of key-value pairs. */ - private static class DataSource extends RichParallelSourceFunction<Tuple2<Long, Long>> { - - private volatile boolean running = true; - - @Override - public void run(SourceContext<Tuple2<Long, Long>> ctx) throws Exception { + private static class DataGeneratorFunction Review Comment: Add a javadoc for class? ########## flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/KafkaEventsGeneratorJob.java: ########## @@ -35,20 +40,33 @@ public static void main(String[] args) throws Exception { final ParameterTool params = ParameterTool.fromArgs(args); - double errorRate = params.getDouble("error-rate", 0.0); - int sleep = params.getInt("sleep", 1); - - String kafkaTopic = params.get("kafka-topic"); - String brokers = params.get("brokers", "localhost:9092"); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); Review Comment: Is there any limitation we need to change from default 4 to 1? ########## flink-examples/flink-examples-streaming/pom.xml: ########## @@ -70,14 +70,16 @@ under the License. <groupId>org.apache.flink</groupId> <artifactId>flink-connector-datagen</artifactId> <version>${project.version}</version> - </dependency> + <!-- required by the shade plugin --> + <optional>${flink.markBundledAsOptional}</optional> + </dependency> Review Comment: minor: please keep align. ########## flink-examples/flink-examples-streaming/pom.xml: ########## @@ -70,14 +70,16 @@ under the License. <groupId>org.apache.flink</groupId> <artifactId>flink-connector-datagen</artifactId> <version>${project.version}</version> - </dependency> + <!-- required by the shade plugin --> + <optional>${flink.markBundledAsOptional}</optional> + </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-shaded-jackson</artifactId> - </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-shaded-jackson</artifactId> + </dependency> - <!-- test dependencies --> + <!-- test dependencies --> Review Comment: please revert tUnnecessary changes ? ########## flink-examples/flink-examples-streaming/pom.xml: ########## @@ -341,6 +343,19 @@ under the License. <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <executions> + <execution> + <phase>package</phase> Review Comment: Shading `data-gen` connector to flink-examples-streaming is necessary and it makes sense to me. minor: We can add a meaningful <id> to describe the shade purpose -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org