lindong28 commented on code in PR #219:
URL: https://github.com/apache/flink-ml/pull/219#discussion_r1136431138


##########
flink-ml-servable-lib/src/main/java/org/apache/flink/ml/classification/logisticregression/LogisticRegressionModelServable.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.classification.logisticregression;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.ml.linalg.BLAS;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.linalg.typeinfo.DenseVectorSerializer;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.servable.api.DataFrame;
+import org.apache.flink.ml.servable.api.ModelServable;
+import org.apache.flink.ml.servable.api.Row;
+import org.apache.flink.ml.servable.types.BasicType;
+import org.apache.flink.ml.servable.types.DataTypes;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ServableReadWriteUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** A Servable which can be used to classifies data in online inference. */
+public class LogisticRegressionModelServable
+        implements ModelServable<LogisticRegressionModelServable>,
+                LogisticRegressionModelParams<LogisticRegressionModelServable> 
{
+
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    private DenseVector coefficient;
+
+    public LogisticRegressionModelServable() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public DataFrame transform(DataFrame input) {
+        List<Double> predictionResults = new ArrayList<>();
+        List<DenseVector> rawPredictionResults = new ArrayList<>();
+
+        int featuresColIndex = input.getIndex(getFeaturesCol());
+        for (Row row : input.collect()) {
+            Vector features = (Vector) row.get(featuresColIndex);
+            Tuple2<Double, DenseVector> dataPoint = 
predictOneDataPoint(features, coefficient);
+            predictionResults.add(dataPoint.f0);
+            rawPredictionResults.add(dataPoint.f1);
+        }
+
+        input.addColumn(getPredictionCol(), DataTypes.DOUBLE, 
predictionResults);
+        input.addColumn(
+                getRawPredictionCol(), DataTypes.VECTOR(BasicType.DOUBLE), 
rawPredictionResults);
+
+        return input;
+    }
+
+    public LogisticRegressionModelServable setModelData(InputStream... 
modelDataInputs)
+            throws IOException {
+        Preconditions.checkArgument(modelDataInputs.length == 1);
+
+        DataInputViewStreamWrapper inputViewStreamWrapper =
+                new DataInputViewStreamWrapper(modelDataInputs[0]);
+
+        DenseVectorSerializer serializer = new DenseVectorSerializer();
+        coefficient = serializer.deserialize(inputViewStreamWrapper);
+
+        return this;
+    }
+
+    public static LogisticRegressionModelServable load(String path) throws 
IOException {
+        LogisticRegressionModelServable servable =
+                ServableReadWriteUtils.loadServableParam(
+                        path, LogisticRegressionModelServable.class);
+
+        try (InputStream fsDataInputStream = 
ServableReadWriteUtils.loadModelData(path)) {
+            DataInputViewStreamWrapper dataInputViewStreamWrapper =
+                    new DataInputViewStreamWrapper(fsDataInputStream);
+            DenseVectorSerializer serializer = new DenseVectorSerializer();
+            servable.coefficient = 
serializer.deserialize(dataInputViewStreamWrapper);
+            return servable;
+        }
+    }
+
+    /**
+     * The main logic that predicts one input data point.
+     *
+     * @param feature The input feature.
+     * @param coefficient The model parameters.
+     * @return The prediction label and the raw probabilities.
+     */
+    public static Tuple2<Double, DenseVector> predictOneDataPoint(
+            Vector feature, DenseVector coefficient) {
+        double dotValue = BLAS.dot(feature, coefficient);
+        double prob = 1 - 1.0 / (1.0 + Math.exp(dotValue));
+        return Tuple2.of(dotValue >= 0 ? 1. : 0., Vectors.dense(1 - prob, 
prob));
+    }
+
+    /**
+     * Serializes the model data into byte array which can be saved to 
external storage and then be
+     * used to update the Servable by `TransformerServable::setModelData` 
method.
+     *
+     * @param modelData The model data to be serialized.
+     * @return The serialized model data in byte array.
+     */
+    public static byte[] serialize(DenseVector modelData) throws IOException {
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+
+        DataOutputViewStreamWrapper outputViewStreamWrapper =
+                new DataOutputViewStreamWrapper(outputStream);
+
+        DenseVectorSerializer serializer = new DenseVectorSerializer();
+        serializer.serialize(modelData, outputViewStreamWrapper);
+
+        return outputStream.toByteArray();
+    }
+
+    @Override
+    public Map<Param<?>, Object> getParamMap() {
+        return paramMap;
+    }
+
+    public DenseVector getCoefficient() {

Review Comment:
   Is it necessary to add this public API for users to get servable model data? 
This public API was not discussed in any existing FLIP.



##########
flink-ml-lib/src/test/java/org/apache/flink/ml/classification/LogisticRegressionTest.java:
##########
@@ -253,6 +255,25 @@ public void testSaveLoadAndPredict() throws Exception {
                 logisticRegression.getRawPredictionCol());
     }
 
+    @Test
+    public void testSaveLoadServable() throws Exception {
+        LogisticRegression logisticRegression = new 
LogisticRegression().setWeightCol("weight");
+        LogisticRegressionModel model = 
logisticRegression.fit(binomialDataTable);
+
+        LogisticRegressionModelServable servable =
+                saveAndLoadServable(
+                        tEnv,
+                        model,
+                        tempFolder.newFolder().getAbsolutePath(),
+                        LogisticRegressionModel::loadServable);
+
+        assertEquals("features", servable.getFeaturesCol());
+        assertEquals("prediction", servable.getPredictionCol());
+        assertEquals("rawPrediction", servable.getRawPredictionCol());
+
+        assertArrayEquals(expectedCoefficient, 
servable.getCoefficient().values, 0.1);

Review Comment:
   Should we verify the result of the `servable.transform(...)` similar to 
`PipelineTest#testPipelineModelServable`?



##########
flink-ml-servable-lib/src/test/java/org/apache/flink/ml/classification/logisticregression/LogisticRegressionModelServableTest.java:
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.classification.logisticregression;
+
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.servable.api.DataFrame;
+import org.apache.flink.ml.servable.api.Row;
+import org.apache.flink.ml.servable.types.BasicType;
+import org.apache.flink.ml.servable.types.DataTypes;
+
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** Tests the {@link LogisticRegressionModelServable}. */
+public class LogisticRegressionModelServableTest {

Review Comment:
   Should we have a test that covers the processing of saving model and loading 
servable, similar to `PipelineTest#testPipelineModelServable`?



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/logisticregression/LogisticRegressionModel.java:
##########
@@ -150,21 +153,9 @@ public Row map(Row dataPoint) {
                 coefficient = modelData.coefficient;
             }
             DenseVector features = ((Vector) 
dataPoint.getField(featuresCol)).toDense();
-            Row predictionResult = predictOneDataPoint(features, coefficient);
-            return Row.join(dataPoint, predictionResult);
+            Tuple2<Double, DenseVector> predictionResult =
+                    
LogisticRegressionModelServable.predictOneDataPoint(features, coefficient);

Review Comment:
   Would it be more readable to instantiate an instance of 
`LogisticRegressionModelServable` and call 
`LogisticRegressionModelServable#transform(features)` here?



##########
flink-ml-servable-lib/src/main/java/org/apache/flink/ml/classification/logisticregression/LogisticRegressionModelServable.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.classification.logisticregression;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.ml.linalg.BLAS;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.linalg.typeinfo.DenseVectorSerializer;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.servable.api.DataFrame;
+import org.apache.flink.ml.servable.api.ModelServable;
+import org.apache.flink.ml.servable.api.Row;
+import org.apache.flink.ml.servable.types.BasicType;
+import org.apache.flink.ml.servable.types.DataTypes;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ServableReadWriteUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** A Servable which can be used to classifies data in online inference. */
+public class LogisticRegressionModelServable
+        implements ModelServable<LogisticRegressionModelServable>,
+                LogisticRegressionModelParams<LogisticRegressionModelServable> 
{
+
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    private DenseVector coefficient;
+
+    public LogisticRegressionModelServable() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public DataFrame transform(DataFrame input) {
+        List<Double> predictionResults = new ArrayList<>();
+        List<DenseVector> rawPredictionResults = new ArrayList<>();
+
+        int featuresColIndex = input.getIndex(getFeaturesCol());
+        for (Row row : input.collect()) {
+            Vector features = (Vector) row.get(featuresColIndex);
+            Tuple2<Double, DenseVector> dataPoint = 
predictOneDataPoint(features, coefficient);
+            predictionResults.add(dataPoint.f0);
+            rawPredictionResults.add(dataPoint.f1);
+        }
+
+        input.addColumn(getPredictionCol(), DataTypes.DOUBLE, 
predictionResults);
+        input.addColumn(
+                getRawPredictionCol(), DataTypes.VECTOR(BasicType.DOUBLE), 
rawPredictionResults);
+
+        return input;
+    }
+
+    public LogisticRegressionModelServable setModelData(InputStream... 
modelDataInputs)
+            throws IOException {
+        Preconditions.checkArgument(modelDataInputs.length == 1);
+
+        DataInputViewStreamWrapper inputViewStreamWrapper =
+                new DataInputViewStreamWrapper(modelDataInputs[0]);
+
+        DenseVectorSerializer serializer = new DenseVectorSerializer();
+        coefficient = serializer.deserialize(inputViewStreamWrapper);
+
+        return this;
+    }
+
+    public static LogisticRegressionModelServable load(String path) throws 
IOException {
+        LogisticRegressionModelServable servable =
+                ServableReadWriteUtils.loadServableParam(
+                        path, LogisticRegressionModelServable.class);
+
+        try (InputStream fsDataInputStream = 
ServableReadWriteUtils.loadModelData(path)) {
+            DataInputViewStreamWrapper dataInputViewStreamWrapper =
+                    new DataInputViewStreamWrapper(fsDataInputStream);
+            DenseVectorSerializer serializer = new DenseVectorSerializer();
+            servable.coefficient = 
serializer.deserialize(dataInputViewStreamWrapper);
+            return servable;
+        }
+    }
+
+    /**
+     * The main logic that predicts one input data point.
+     *
+     * @param feature The input feature.
+     * @param coefficient The model parameters.
+     * @return The prediction label and the raw probabilities.
+     */
+    public static Tuple2<Double, DenseVector> predictOneDataPoint(

Review Comment:
   Should we make this method protected so that it won't be used by code 
outside Flink ML repo to do online prediction?
   
   Would it be better to name this method `Tuple2<Double, DenseVector> 
transform(Vector feature)` so that it is more consistent with `DataFrame 
transform(DataFrame input)`?



##########
flink-ml-servable-lib/src/main/java/org/apache/flink/ml/classification/logisticregression/LogisticRegressionModelServable.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.classification.logisticregression;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.ml.linalg.BLAS;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.linalg.typeinfo.DenseVectorSerializer;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.servable.api.DataFrame;
+import org.apache.flink.ml.servable.api.ModelServable;
+import org.apache.flink.ml.servable.api.Row;
+import org.apache.flink.ml.servable.types.BasicType;
+import org.apache.flink.ml.servable.types.DataTypes;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ServableReadWriteUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** A Servable which can be used to classifies data in online inference. */
+public class LogisticRegressionModelServable
+        implements ModelServable<LogisticRegressionModelServable>,
+                LogisticRegressionModelParams<LogisticRegressionModelServable> 
{
+
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    private DenseVector coefficient;
+
+    public LogisticRegressionModelServable() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public DataFrame transform(DataFrame input) {
+        List<Double> predictionResults = new ArrayList<>();
+        List<DenseVector> rawPredictionResults = new ArrayList<>();
+
+        int featuresColIndex = input.getIndex(getFeaturesCol());
+        for (Row row : input.collect()) {
+            Vector features = (Vector) row.get(featuresColIndex);
+            Tuple2<Double, DenseVector> dataPoint = 
predictOneDataPoint(features, coefficient);
+            predictionResults.add(dataPoint.f0);
+            rawPredictionResults.add(dataPoint.f1);
+        }
+
+        input.addColumn(getPredictionCol(), DataTypes.DOUBLE, 
predictionResults);
+        input.addColumn(
+                getRawPredictionCol(), DataTypes.VECTOR(BasicType.DOUBLE), 
rawPredictionResults);
+
+        return input;
+    }
+
+    public LogisticRegressionModelServable setModelData(InputStream... 
modelDataInputs)
+            throws IOException {
+        Preconditions.checkArgument(modelDataInputs.length == 1);
+
+        DataInputViewStreamWrapper inputViewStreamWrapper =
+                new DataInputViewStreamWrapper(modelDataInputs[0]);
+
+        DenseVectorSerializer serializer = new DenseVectorSerializer();
+        coefficient = serializer.deserialize(inputViewStreamWrapper);
+
+        return this;
+    }
+
+    public static LogisticRegressionModelServable load(String path) throws 
IOException {
+        LogisticRegressionModelServable servable =
+                ServableReadWriteUtils.loadServableParam(
+                        path, LogisticRegressionModelServable.class);
+
+        try (InputStream fsDataInputStream = 
ServableReadWriteUtils.loadModelData(path)) {
+            DataInputViewStreamWrapper dataInputViewStreamWrapper =
+                    new DataInputViewStreamWrapper(fsDataInputStream);
+            DenseVectorSerializer serializer = new DenseVectorSerializer();
+            servable.coefficient = 
serializer.deserialize(dataInputViewStreamWrapper);
+            return servable;
+        }
+    }
+
+    /**
+     * The main logic that predicts one input data point.
+     *
+     * @param feature The input feature.
+     * @param coefficient The model parameters.
+     * @return The prediction label and the raw probabilities.
+     */
+    public static Tuple2<Double, DenseVector> predictOneDataPoint(
+            Vector feature, DenseVector coefficient) {
+        double dotValue = BLAS.dot(feature, coefficient);
+        double prob = 1 - 1.0 / (1.0 + Math.exp(dotValue));
+        return Tuple2.of(dotValue >= 0 ? 1. : 0., Vectors.dense(1 - prob, 
prob));
+    }
+
+    /**
+     * Serializes the model data into byte array which can be saved to 
external storage and then be
+     * used to update the Servable by `TransformerServable::setModelData` 
method.
+     *
+     * @param modelData The model data to be serialized.
+     * @return The serialized model data in byte array.
+     */
+    public static byte[] serialize(DenseVector modelData) throws IOException {

Review Comment:
   Does this mean that users need to know the internal data structure/type of 
the model data in order to use this public method?



##########
flink-ml-servable-lib/src/test/java/org/apache/flink/ml/classification/logisticregression/LogisticRegressionModelServableTest.java:
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.classification.logisticregression;
+
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.servable.api.DataFrame;
+import org.apache.flink.ml.servable.api.Row;
+import org.apache.flink.ml.servable.types.BasicType;
+import org.apache.flink.ml.servable.types.DataTypes;
+
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** Tests the {@link LogisticRegressionModelServable}. */
+public class LogisticRegressionModelServableTest {
+
+    private static final DataFrame PREDICT_DATA =
+            new DataFrame(
+                    new ArrayList<>(Arrays.asList("features", "label", 
"weight")),
+                    new ArrayList<>(
+                            Arrays.asList(
+                                    DataTypes.VECTOR(BasicType.DOUBLE),
+                                    DataTypes.DOUBLE,
+                                    DataTypes.DOUBLE)),
+                    Arrays.asList(
+                            new Row(
+                                    new ArrayList<>(
+                                            Arrays.asList(Vectors.dense(1, 2, 
3, 4), 0., 1.))),
+                            new Row(
+                                    new ArrayList<>(
+                                            Arrays.asList(Vectors.dense(1, 2, 
3, 4), 0., 2.))),
+                            new Row(
+                                    new ArrayList<>(
+                                            Arrays.asList(Vectors.dense(2, 2, 
3, 4), 0., 3.))),
+                            new Row(
+                                    new ArrayList<>(
+                                            Arrays.asList(Vectors.dense(3, 2, 
3, 4), 0., 4.))),
+                            new Row(
+                                    new ArrayList<>(
+                                            Arrays.asList(Vectors.dense(4, 2, 
3, 4), 0., 5.))),
+                            new Row(
+                                    new ArrayList<>(
+                                            Arrays.asList(Vectors.dense(11, 2, 
3, 4), 1., 1.))),
+                            new Row(
+                                    new ArrayList<>(
+                                            Arrays.asList(Vectors.dense(12, 2, 
3, 4), 1., 2.))),
+                            new Row(
+                                    new ArrayList<>(
+                                            Arrays.asList(Vectors.dense(13, 2, 
3, 4), 1., 3.))),
+                            new Row(
+                                    new ArrayList<>(
+                                            Arrays.asList(Vectors.dense(14, 2, 
3, 4), 1., 4.))),
+                            new Row(
+                                    new ArrayList<>(
+                                            Arrays.asList(Vectors.dense(15, 2, 
3, 4), 1., 5.)))));
+
+    private static final DenseVector COEFFICIENT = Vectors.dense(0.525, 
-0.283, -0.425, -0.567);
+
+    private static final double TOLERANCE = 1e-7;
+
+    @Test
+    public void testParam() {
+        LogisticRegressionModelServable servable = new 
LogisticRegressionModelServable();
+        assertEquals("features", servable.getFeaturesCol());
+        assertEquals("prediction", servable.getPredictionCol());
+        assertEquals("rawPrediction", servable.getRawPredictionCol());
+
+        servable.setFeaturesCol("test_features")
+                .setPredictionCol("test_predictionCol")
+                .setRawPredictionCol("test_rawPredictionCol");
+        assertEquals("test_features", servable.getFeaturesCol());
+        assertEquals("test_predictionCol", servable.getPredictionCol());
+        assertEquals("test_rawPredictionCol", servable.getRawPredictionCol());
+    }
+
+    @Test
+    public void testTransform() throws IOException {
+        LogisticRegressionModelServable servable = new 
LogisticRegressionModelServable();
+
+        servable.setModelData(
+                new 
ByteArrayInputStream(LogisticRegressionModelServable.serialize(COEFFICIENT)));
+
+        DataFrame output = servable.transform(PREDICT_DATA);
+
+        verifyPredictionResult(
+                output,
+                servable.getFeaturesCol(),
+                servable.getPredictionCol(),
+                servable.getRawPredictionCol());
+    }
+
+    @Test
+    public void testSetModelData() throws IOException {

Review Comment:
   This test looks **exactly** the same as `testTransform()`. What is the 
purpose of having both tests?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to