yunfengzhou-hub commented on code in PR #111:
URL: https://github.com/apache/flink-ml/pull/111#discussion_r898869251


##########
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/datagenerator/common/DenseVectorArrayGenerator.java:
##########
@@ -18,100 +18,50 @@
 
 package org.apache.flink.ml.benchmark.datagenerator.common;
 
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.ml.benchmark.datagenerator.InputDataGenerator;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.ml.benchmark.datagenerator.param.HasArraySize;
 import org.apache.flink.ml.benchmark.datagenerator.param.HasVectorDim;
-import org.apache.flink.ml.common.datastream.TableUtils;
 import org.apache.flink.ml.linalg.DenseVector;
-import org.apache.flink.ml.param.Param;
-import org.apache.flink.ml.util.ParamUtils;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.Schema;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.util.NumberSequenceIterator;
+import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-
 /** A DataGenerator which creates a table of DenseVector array. */
-public class DenseVectorArrayGenerator
-        implements InputDataGenerator<DenseVectorArrayGenerator>,
-                HasArraySize<DenseVectorArrayGenerator>,
+public class DenseVectorArrayGenerator extends 
InputTableGenerator<DenseVectorArrayGenerator>
+        implements HasArraySize<DenseVectorArrayGenerator>,
                 HasVectorDim<DenseVectorArrayGenerator> {
-    private final Map<Param<?>, Object> paramMap = new HashMap<>();
-
-    public DenseVectorArrayGenerator() {
-        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
-    }
 
     @Override
-    public Table[] getData(StreamTableEnvironment tEnv) {
-        StreamExecutionEnvironment env = 
TableUtils.getExecutionEnvironment(tEnv);
-
-        DataStream<DenseVector[]> dataStream =
-                env.fromParallelCollection(
-                                new NumberSequenceIterator(1L, getNumValues()),
-                                BasicTypeInfo.LONG_TYPE_INFO)
-                        .map(
-                                new 
GenerateRandomContinuousVectorArrayFunction(
-                                        getSeed(), getVectorDim(), 
getArraySize()));
-
-        Schema schema = Schema.newBuilder().column("f0", 
DataTypes.of(DenseVector[].class)).build();
-        Table dataTable = tEnv.fromDataStream(dataStream, schema);
-        if (getColNames() != null) {
-            Preconditions.checkState(getColNames().length == 1);
-            Preconditions.checkState(getColNames()[0].length == 1);
-            dataTable = dataTable.as(getColNames()[0][0]);
-        }
-
-        return new Table[] {dataTable};
-    }
+    protected RowGenerator[] getRowGenerators() {
+        int arraySize = getArraySize();
+        int vectorDim = getVectorDim();
+        String[][] columnNames = getColNames();
 
-    private static class GenerateRandomContinuousVectorArrayFunction
-            extends RichMapFunction<Long, DenseVector[]> {
-        private final int vectorDim;
-        private final long initSeed;
-        private final int arraySize;
-        private Random random;
-
-        private GenerateRandomContinuousVectorArrayFunction(
-                long initSeed, int vectorDim, int arraySize) {
-            this.vectorDim = vectorDim;
-            this.initSeed = initSeed;
-            this.arraySize = arraySize;
-        }
-
-        @Override
-        public void open(Configuration parameters) throws Exception {
-            super.open(parameters);
-            int index = getRuntimeContext().getIndexOfThisSubtask();
-            random = new Random(Tuple2.of(initSeed, index).hashCode());
-        }
+        return new RowGenerator[] {
+            new RowGenerator(getNumValues(), getSeed()) {
+                @Override
+                protected Row nextRow() {
+                    DenseVector[] result = new DenseVector[arraySize];
+                    for (int i = 0; i < arraySize; i++) {
+                        result[i] = new DenseVector(vectorDim);
+                        for (int j = 0; j < vectorDim; j++) {
+                            result[i].values[j] = random.nextDouble();
+                        }
+                    }
+                    Row row = new Row(1);
+                    row.setField(0, result);
+                    return row;
+                }
 
-        @Override
-        public DenseVector[] map(Long value) {
-            DenseVector[] result = new DenseVector[arraySize];
-            for (int i = 0; i < arraySize; i++) {
-                result[i] = new DenseVector(vectorDim);
-                for (int j = 0; j < vectorDim; j++) {
-                    result[i].values[j] = random.nextDouble();
+                @Override
+                protected RowTypeInfo getRowTypeInfo() {
+                    Preconditions.checkState(columnNames.length == 1);
+                    Preconditions.checkState(columnNames.length == 1);

Review Comment:
   This should be `columnNames[0].length == 1`.
   
   And do you think it would be better to move this check from `getRowTypeInfo` 
to `getRowGenerators`?



##########
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/BenchmarkUtils.java:
##########
@@ -99,6 +99,7 @@ private static BenchmarkResult runBenchmark(
             DataGenerator<?> modelDataGenerator)
             throws Exception {
         StreamExecutionEnvironment env = 
TableUtils.getExecutionEnvironment(tEnv);
+        env.getConfig().enableObjectReuse();

Review Comment:
   I'm not sure if we should add configurations like this.
   - `enableObjectReuse` might not be a panacea for all Flink ML algorithms. 
Some algorithms might even get errors if object reuse is enabled.
   - Flink ML users can always optimize performance on their own by changing 
configuration parameters. Our focus should be to optimize implementation 
details that are invisible to users.



##########
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/datagenerator/common/DoubleGenerator.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark.datagenerator.common;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Arrays;
+
+/** A DataGenerator which creates a table of doubles. */
+public class DoubleGenerator extends InputTableGenerator<DoubleGenerator> {

Review Comment:
   Shall we add benchmark configuration files for algorithms like 
`StringIndexer` and `Bucketizer`? `kmeans-benchmark.json` could serve as a 
reference.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to