lindong28 commented on code in PR #218:
URL: https://github.com/apache/flink-ml/pull/218#discussion_r1130375623


##########
flink-ml-core/src/test/java/org/apache/flink/ml/api/ExampleStages.java:
##########
@@ -110,6 +111,10 @@ public static SumModel load(StreamTableEnvironment tEnv, 
String path) throws IOE
             SumModel model = ReadWriteUtils.loadStageParam(path);
             return model.setModelData(modelDataTable);
         }
+
+        public static SumModelServable loadServable(String path) throws 
IOException {

Review Comment:
   Can we add a test this method directly (rather than via reflection) similar 
to the existing `ReadWriteUtilsTest#testModelSaveLoad`? This would allow 
developers to quickly identify where this method is tested via IDE. 
   
   And it is also more consistent with the idea that we want to test every 
public API in the same way as how users are going to use it.



##########
flink-ml-servable-core/src/test/java/org/apache/flink/ml/servable/builder/ExampleServables.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.servable.builder;
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.servable.api.DataFrame;
+import org.apache.flink.ml.servable.api.ModelServable;
+import org.apache.flink.ml.servable.api.Row;
+import org.apache.flink.ml.servable.api.TransformerServable;
+import org.apache.flink.ml.servable.types.DataTypes;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.Serializer;
+import org.apache.flink.ml.util.ServableReadWriteUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Defines Servable subclasses to be used in unit tests. */
+public class ExampleServables {
+
+    /**
+     * A {@link TransformerServable} subclass that increments every value in 
the input dataframe by
+     * `delta` and outputs the resulting values.
+     */
+    public static class SumModelServable implements 
ModelServable<SumModelServable> {
+
+        private static final String COL_NAME = "input";
+
+        private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+        private int delta;
+
+        public SumModelServable() {
+            ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+        }
+
+        @Override
+        public DataFrame transform(DataFrame input) {
+            List<Row> outputRows = new ArrayList<>();
+            for (Row row : input.collect()) {
+                assert row.size() == 1;
+                int originValue = (Integer) row.get(0);
+                outputRows.add(new Row(Collections.singletonList(originValue + 
delta)));
+            }
+            return new DataFrame(
+                    Collections.singletonList(COL_NAME),
+                    Collections.singletonList(DataTypes.INT),
+                    outputRows);
+        }
+
+        @Override
+        public Map<Param<?>, Object> getParamMap() {
+            return paramMap;
+        }
+
+        public static SumModelServable load(String path) throws IOException {
+            SumModelServable servable =
+                    ServableReadWriteUtils.loadServableParam(path, 
SumModelServable.class);
+
+            try (InputStream inputStream = 
ServableReadWriteUtils.loadModelData(path)) {
+                DataInputViewStreamWrapper dataInputViewStreamWrapper =
+                        new DataInputViewStreamWrapper(inputStream);
+                int delta = 
IntSerializer.INSTANCE.deserialize(dataInputViewStreamWrapper);

Review Comment:
   Would it be simpler to do the following and remove the method 
`setModelData()`?
   
   ```
   servable.delta = 
IntSerializer.INSTANCE.deserialize(dataInputViewStreamWrapper);
   return servable;
   ```



##########
flink-ml-servable-core/src/main/java/org/apache/flink/ml/util/ServableReadWriteUtils.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.util;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.servable.api.TransformerServable;
+import org.apache.flink.ml.servable.builder.PipelineModelServable;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.ml.util.FileUtils.loadMetadata;
+
+/** Utility methods for loading Servables. */
+public class ServableReadWriteUtils {
+
+    /**
+     * Loads the servables of a {@link PipelineModelServable} from the given 
path.
+     *
+     * <p>The method throws RuntimeException if the expectedClassName is not 
empty AND it does not
+     * match the className of the previously saved PipelineModel.
+     *
+     * @param path The parent directory to load the PipelineModelServable 
metadata and its
+     *     servables.
+     * @return A list of servables.
+     */
+    public static List<TransformerServable<?>> loadPipelineOfServables(String 
path)

Review Comment:
   How about naming it `loadPipeline` for consistency with 
`ReadWriteUtils#loadPipeline`?



##########
flink-ml-core/src/test/java/org/apache/flink/ml/api/IntegrateWithServableTest.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.api;
+
+import org.apache.flink.ml.api.ExampleStages.SumModel;
+import org.apache.flink.ml.servable.api.DataFrame;
+import org.apache.flink.ml.servable.api.Row;
+import org.apache.flink.ml.servable.builder.ExampleServables.SumModelServable;
+import org.apache.flink.ml.servable.types.DataTypes;
+import org.apache.flink.ml.util.Serializer;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.ByteArrayInputStream;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.flink.ml.servable.TestUtils.compareDataFrame;
+import static org.apache.flink.ml.util.TestUtils.saveAndLoadServable;
+
+/** Tests the behavior of integration with Servables. */
+public class IntegrateWithServableTest extends AbstractTestBase {
+
+    private StreamTableEnvironment tEnv;
+
+    private static final DataFrame INPUT =
+            new DataFrame(
+                    Collections.singletonList("input"),
+                    Collections.singletonList(DataTypes.INT),
+                    Arrays.asList(
+                            new Row(Collections.singletonList(1)),
+                            new Row(Collections.singletonList(2)),
+                            new Row(Collections.singletonList(3))));
+
+    private static final DataFrame EXPECTED_OUTPUT =
+            new DataFrame(
+                    Collections.singletonList("input"),
+                    Collections.singletonList(DataTypes.INT),
+                    Arrays.asList(
+                            new Row(Collections.singletonList(11)),
+                            new Row(Collections.singletonList(12)),
+                            new Row(Collections.singletonList(13))));
+
+    @Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+    @Before
+    public void before() {
+        StreamExecutionEnvironment env = TestUtils.getExecutionEnvironment();
+        tEnv = StreamTableEnvironment.create(env);
+    }
+
+    @Test
+    public void testSaveModelAndLoadAsServable() throws Exception {

Review Comment:
   Would it be simpler to use `testSaveModelLoadServable()`?



##########
flink-ml-core/src/test/java/org/apache/flink/ml/api/IntegrateWithServableTest.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.api;
+
+import org.apache.flink.ml.api.ExampleStages.SumModel;
+import org.apache.flink.ml.servable.api.DataFrame;
+import org.apache.flink.ml.servable.api.Row;
+import org.apache.flink.ml.servable.builder.ExampleServables.SumModelServable;
+import org.apache.flink.ml.servable.types.DataTypes;
+import org.apache.flink.ml.util.Serializer;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.ByteArrayInputStream;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.flink.ml.servable.TestUtils.compareDataFrame;
+import static org.apache.flink.ml.util.TestUtils.saveAndLoadServable;
+
+/** Tests the behavior of integration with Servables. */
+public class IntegrateWithServableTest extends AbstractTestBase {
+
+    private StreamTableEnvironment tEnv;
+
+    private static final DataFrame INPUT =
+            new DataFrame(
+                    Collections.singletonList("input"),
+                    Collections.singletonList(DataTypes.INT),
+                    Arrays.asList(
+                            new Row(Collections.singletonList(1)),
+                            new Row(Collections.singletonList(2)),
+                            new Row(Collections.singletonList(3))));
+
+    private static final DataFrame EXPECTED_OUTPUT =
+            new DataFrame(
+                    Collections.singletonList("input"),
+                    Collections.singletonList(DataTypes.INT),
+                    Arrays.asList(
+                            new Row(Collections.singletonList(11)),
+                            new Row(Collections.singletonList(12)),
+                            new Row(Collections.singletonList(13))));
+
+    @Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+    @Before
+    public void before() {
+        StreamExecutionEnvironment env = TestUtils.getExecutionEnvironment();
+        tEnv = StreamTableEnvironment.create(env);
+    }
+
+    @Test
+    public void testSaveModelAndLoadAsServable() throws Exception {
+        SumModel model = new SumModel().setModelData(tEnv.fromValues(10));
+
+        SumModelServable servable =
+                saveAndLoadServable(tEnv, model, 
tempFolder.newFolder().getAbsolutePath());
+
+        DataFrame output = servable.transform(INPUT);
+
+        compareDataFrame(EXPECTED_OUTPUT, output);
+    }
+
+    @Test
+    public void testGetModelDataAndSetToServable() throws Exception {

Review Comment:
   Would it be simpler to name it `testSetModelData()`?



##########
flink-ml-core/src/test/java/org/apache/flink/ml/api/IntegrateWithServableTest.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.api;
+
+import org.apache.flink.ml.api.ExampleStages.SumModel;
+import org.apache.flink.ml.servable.api.DataFrame;
+import org.apache.flink.ml.servable.api.Row;
+import org.apache.flink.ml.servable.builder.ExampleServables.SumModelServable;
+import org.apache.flink.ml.servable.types.DataTypes;
+import org.apache.flink.ml.util.Serializer;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.ByteArrayInputStream;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.flink.ml.servable.TestUtils.compareDataFrame;
+import static org.apache.flink.ml.util.TestUtils.saveAndLoadServable;
+
+/** Tests the behavior of integration with Servables. */
+public class IntegrateWithServableTest extends AbstractTestBase {
+
+    private StreamTableEnvironment tEnv;
+
+    private static final DataFrame INPUT =
+            new DataFrame(
+                    Collections.singletonList("input"),
+                    Collections.singletonList(DataTypes.INT),
+                    Arrays.asList(
+                            new Row(Collections.singletonList(1)),
+                            new Row(Collections.singletonList(2)),
+                            new Row(Collections.singletonList(3))));
+
+    private static final DataFrame EXPECTED_OUTPUT =
+            new DataFrame(
+                    Collections.singletonList("input"),
+                    Collections.singletonList(DataTypes.INT),
+                    Arrays.asList(
+                            new Row(Collections.singletonList(11)),
+                            new Row(Collections.singletonList(12)),
+                            new Row(Collections.singletonList(13))));
+
+    @Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+    @Before
+    public void before() {
+        StreamExecutionEnvironment env = TestUtils.getExecutionEnvironment();
+        tEnv = StreamTableEnvironment.create(env);
+    }
+
+    @Test
+    public void testSaveModelAndLoadAsServable() throws Exception {
+        SumModel model = new SumModel().setModelData(tEnv.fromValues(10));
+
+        SumModelServable servable =
+                saveAndLoadServable(tEnv, model, 
tempFolder.newFolder().getAbsolutePath());
+
+        DataFrame output = servable.transform(INPUT);
+
+        compareDataFrame(EXPECTED_OUTPUT, output);
+    }
+
+    @Test
+    public void testGetModelDataAndSetToServable() throws Exception {
+        SumModel model = new SumModel().setModelData(tEnv.fromValues(10));
+        Table modelDataTable = model.getModelData()[0];
+        Integer modelData =
+                tEnv.toDataStream(modelDataTable)
+                        .map(x -> (int) x.getField(0))
+                        .executeAndCollect()
+                        .next();
+
+        byte[] serializedModelData = Serializer.serialize(modelData);

Review Comment:
   Serializing the model data as object stream is not very efficient, similar 
to the serialization using kryo.  And we don't expect user code to be directly 
aware of the internal data structure of the model data.
   
   It would be useful for tests to show the best practice of developing and 
using servable.
   
   How is one possible way to convert the a Table from `Model#getModelData` to 
an InputStream that can be provided to `ModelServable#setModelData`.
   
   1. Table of model data object (in this case model object is an integer)
   2. Table of byte[]
   3. List of bytes[]
   4. InputStream of bytes[]
   
   
   
   
   



##########
flink-ml-core/src/test/java/org/apache/flink/ml/api/IntegrateWithServableTest.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.api;
+
+import org.apache.flink.ml.api.ExampleStages.SumModel;
+import org.apache.flink.ml.servable.api.DataFrame;
+import org.apache.flink.ml.servable.api.Row;
+import org.apache.flink.ml.servable.builder.ExampleServables.SumModelServable;
+import org.apache.flink.ml.servable.types.DataTypes;
+import org.apache.flink.ml.util.Serializer;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.ByteArrayInputStream;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.flink.ml.servable.TestUtils.compareDataFrame;
+import static org.apache.flink.ml.util.TestUtils.saveAndLoadServable;
+
+/** Tests the behavior of integration with Servables. */
+public class IntegrateWithServableTest extends AbstractTestBase {

Review Comment:
   Would it be simpler to name it either `ServableTest`?



##########
flink-ml-servable-core/src/test/java/org/apache/flink/ml/servable/builder/ExampleServables.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.servable.builder;
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.servable.api.DataFrame;
+import org.apache.flink.ml.servable.api.ModelServable;
+import org.apache.flink.ml.servable.api.Row;
+import org.apache.flink.ml.servable.api.TransformerServable;
+import org.apache.flink.ml.servable.types.DataTypes;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.Serializer;
+import org.apache.flink.ml.util.ServableReadWriteUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Defines Servable subclasses to be used in unit tests. */
+public class ExampleServables {
+
+    /**
+     * A {@link TransformerServable} subclass that increments every value in 
the input dataframe by
+     * `delta` and outputs the resulting values.
+     */
+    public static class SumModelServable implements 
ModelServable<SumModelServable> {
+
+        private static final String COL_NAME = "input";
+
+        private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+        private int delta;
+
+        public SumModelServable() {
+            ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+        }
+
+        @Override
+        public DataFrame transform(DataFrame input) {
+            List<Row> outputRows = new ArrayList<>();
+            for (Row row : input.collect()) {
+                assert row.size() == 1;
+                int originValue = (Integer) row.get(0);
+                outputRows.add(new Row(Collections.singletonList(originValue + 
delta)));
+            }
+            return new DataFrame(
+                    Collections.singletonList(COL_NAME),
+                    Collections.singletonList(DataTypes.INT),
+                    outputRows);
+        }
+
+        @Override
+        public Map<Param<?>, Object> getParamMap() {
+            return paramMap;
+        }
+
+        public static SumModelServable load(String path) throws IOException {
+            SumModelServable servable =
+                    ServableReadWriteUtils.loadServableParam(path, 
SumModelServable.class);
+
+            try (InputStream inputStream = 
ServableReadWriteUtils.loadModelData(path)) {
+                DataInputViewStreamWrapper dataInputViewStreamWrapper =
+                        new DataInputViewStreamWrapper(inputStream);
+                int delta = 
IntSerializer.INSTANCE.deserialize(dataInputViewStreamWrapper);
+                return servable.setDelta(delta);
+            }
+        }
+
+        public SumModelServable setDelta(int delta) {
+            this.delta = delta;
+            return this;
+        }
+
+        public SumModelServable setModelData(InputStream... modelDataInputs) 
throws IOException {
+            assert modelDataInputs.length == 1;

Review Comment:
   Would the following code be more consistent with the existing way of 
checking argument length?
   
   ```
   Preconditions.checkArgument(inputs.length == 1);
   ```



##########
flink-ml-servable-core/src/test/java/org/apache/flink/ml/servable/TestUtils.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.servable;
+
+import org.apache.flink.ml.servable.api.DataFrame;
+import org.apache.flink.ml.servable.api.Row;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/** Utility methods for tests. */
+public class TestUtils {
+
+    /** Compares two dataframes. */
+    public static void compareDataFrame(DataFrame first, DataFrame second) {

Review Comment:
   Would it be simpler to name it `compare(...)`?
   
   Should this method return `int` for consistency with other `compare` methods?
   
   If this method throw exception when the given data frames are not equal, it 
seems better to name it `assertEquals(...)`.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to