yunfengzhou-hub commented on code in PR #155:
URL: https://github.com/apache/flink-ml/pull/155#discussion_r973815348


##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/PolynomialExpansionTest.java:
##########
@@ -20,7 +20,7 @@
 

Review Comment:
   Let's add back the `NormalizerTest`.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/polynomialexpansion/PolynomialExpansion.java:
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.polynomialexpansion;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.typeinfo.VectorTypeInfo;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.math3.util.ArithmeticUtils;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A Transformer that expands the input vectors in polynomial space.
+ *
+ * <p>Take a 2-dimension vector as an example: `(x, y)`, if we want to expand 
it with degree 2, then
+ * we get `(x, x * x, y, x * y, y * y)`.
+ *
+ * <p>For more information about the polynomial expansion, see
+ * http://en.wikipedia.org/wiki/Polynomial_expansion.
+ */
+public class PolynomialExpansion
+        implements Transformer<PolynomialExpansion>,
+                PolynomialExpansionParams<PolynomialExpansion> {
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    public PolynomialExpansion() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public Table[] transform(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) 
inputs[0]).getTableEnvironment();
+        RowTypeInfo inputTypeInfo = 
TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema());
+
+        RowTypeInfo outputTypeInfo =
+                new RowTypeInfo(
+                        ArrayUtils.addAll(inputTypeInfo.getFieldTypes(), 
VectorTypeInfo.INSTANCE),
+                        ArrayUtils.addAll(inputTypeInfo.getFieldNames(), 
getOutputCol()));
+
+        DataStream<Row> output =
+                tEnv.toDataStream(inputs[0])
+                        .map(
+                                new PolynomialExpansionFunction(getDegree(), 
getInputCol()),
+                                outputTypeInfo);
+
+        Table outputTable = tEnv.fromDataStream(output);
+        return new Table[] {outputTable};
+    }
+
+    @Override
+    public void save(String path) throws IOException {
+        ReadWriteUtils.saveMetadata(this, path);
+    }
+
+    public static PolynomialExpansion load(StreamTableEnvironment env, String 
path)
+            throws IOException {
+        return ReadWriteUtils.loadStageParam(path);
+    }
+
+    @Override
+    public Map<Param<?>, Object> getParamMap() {
+        return paramMap;
+    }
+
+    /** Polynomial expansion function that expands a vector in polynomial 
space. */
+    private static class PolynomialExpansionFunction implements 
MapFunction<Row, Row> {

Review Comment:
   In Spark there is a detailed comment about how this function is achieved as 
follows.
   ```java
   /**
    * The expansion is done via recursion. Given n features and degree d, the 
size after expansion is
    * (n + d choose d) (including 1 and first-order values). For example, let 
f([a, b, c], 3) be the
    * function that expands [a, b, c] to their monomials of degree 3. We have 
the following recursion:
    *
    * <blockquote>
    *    $$
    *    f([a, b, c], 3) &= f([a, b], 3) ++ f([a, b], 2) * c ++ f([a, b], 1) * 
c^2 ++ [c^3]
    *    $$
    * </blockquote>
    *
    * To handle sparsity, if c is zero, we can skip all monomials that contain 
it. We remember the
    * current index and increment it properly for sparse input.
    */
   ```
   Do you think we should also add a comment like this?



##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/PolynomialExpansionTest.java:
##########
@@ -149,44 +135,56 @@ public void testOutputSchema() {
 
     @Test
     public void testSaveLoadAndTransform() throws Exception {
-        Normalizer normalizer =
-                new 
Normalizer().setInputCol("denseVec").setOutputCol("outputVec").setP(1.5);
+        PolynomialExpansion polynomialExpansion =
+                new PolynomialExpansion()
+                        .setInputCol("denseVec")
+                        .setOutputCol("outputVec")
+                        .setDegree(2);
 
-        Normalizer loadedNormalizer =
+        PolynomialExpansion loadedPolynomialExpansion =
                 TestUtils.saveAndReload(
-                        tEnv, normalizer, 
TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+                        tEnv, polynomialExpansion, 
TEMPORARY_FOLDER.newFolder().getAbsolutePath());
 
-        Table output = loadedNormalizer.transform(inputDataTable)[0];
-        verifyOutputResult(output, loadedNormalizer.getOutputCol(), 
EXPECTED_DENSE_OUTPUT);
+        Table output = loadedPolynomialExpansion.transform(inputDataTable)[0];
+        verifyOutputResult(output, loadedPolynomialExpansion.getOutputCol(), 
EXPECTED_DENSE_OUTPUT);
     }
 
     @Test
     public void testInvalidP() {
         try {
-            Normalizer normalizer =
-                    new 
Normalizer().setInputCol("denseVec").setOutputCol("outputVec").setP(0.5);
-            normalizer.transform(inputDataTable);
+            PolynomialExpansion polynomialExpansion =
+                    new PolynomialExpansion()
+                            .setInputCol("denseVec")
+                            .setOutputCol("outputVec")
+                            .setDegree(-1);
+            polynomialExpansion.transform(inputDataTable);
             fail();
         } catch (Exception e) {
-            assertEquals("Parameter p is given an invalid value 0.5", 
e.getMessage());
+            assertEquals("Parameter degree is given an invalid value -1", 
e.getMessage());
         }
     }
 
     @Test
     public void testDenseTransform() throws Exception {
-        Normalizer normalizer =
-                new 
Normalizer().setInputCol("denseVec").setOutputCol("outputVec").setP(1.5);
-
-        Table output = normalizer.transform(inputDataTable)[0];
-        verifyOutputResult(output, normalizer.getOutputCol(), 
EXPECTED_DENSE_OUTPUT);
+        PolynomialExpansion polynomialExpansion =

Review Comment:
   Could you please add test cases when degree > 3 and when vector size > 3? 
These test cases can help improve the coverage rate.



##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/PolynomialExpansionTest.java:
##########
@@ -149,44 +135,56 @@ public void testOutputSchema() {
 
     @Test
     public void testSaveLoadAndTransform() throws Exception {
-        Normalizer normalizer =
-                new 
Normalizer().setInputCol("denseVec").setOutputCol("outputVec").setP(1.5);
+        PolynomialExpansion polynomialExpansion =
+                new PolynomialExpansion()
+                        .setInputCol("denseVec")
+                        .setOutputCol("outputVec")
+                        .setDegree(2);
 
-        Normalizer loadedNormalizer =
+        PolynomialExpansion loadedPolynomialExpansion =
                 TestUtils.saveAndReload(
-                        tEnv, normalizer, 
TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+                        tEnv, polynomialExpansion, 
TEMPORARY_FOLDER.newFolder().getAbsolutePath());
 
-        Table output = loadedNormalizer.transform(inputDataTable)[0];
-        verifyOutputResult(output, loadedNormalizer.getOutputCol(), 
EXPECTED_DENSE_OUTPUT);
+        Table output = loadedPolynomialExpansion.transform(inputDataTable)[0];
+        verifyOutputResult(output, loadedPolynomialExpansion.getOutputCol(), 
EXPECTED_DENSE_OUTPUT);
     }
 
     @Test
     public void testInvalidP() {

Review Comment:
   nit: `testInvalidParamter` or `testInvalidDegree` might be better.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/polynomialexpansion/PolynomialExpansion.java:
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.polynomialexpansion;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.typeinfo.VectorTypeInfo;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.math3.util.ArithmeticUtils;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A Transformer that expands the input vectors in polynomial space.
+ *
+ * <p>Take a 2-dimension vector as an example: `(x, y)`, if we want to expand 
it with degree 2, then
+ * we get `(x, x * x, y, x * y, y * y)`.
+ *
+ * <p>For more information about the polynomial expansion, see
+ * http://en.wikipedia.org/wiki/Polynomial_expansion.
+ */
+public class PolynomialExpansion
+        implements Transformer<PolynomialExpansion>,
+                PolynomialExpansionParams<PolynomialExpansion> {
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    public PolynomialExpansion() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public Table[] transform(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) 
inputs[0]).getTableEnvironment();
+        RowTypeInfo inputTypeInfo = 
TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema());
+
+        RowTypeInfo outputTypeInfo =
+                new RowTypeInfo(
+                        ArrayUtils.addAll(inputTypeInfo.getFieldTypes(), 
VectorTypeInfo.INSTANCE),
+                        ArrayUtils.addAll(inputTypeInfo.getFieldNames(), 
getOutputCol()));
+
+        DataStream<Row> output =
+                tEnv.toDataStream(inputs[0])
+                        .map(
+                                new PolynomialExpansionFunction(getDegree(), 
getInputCol()),
+                                outputTypeInfo);
+
+        Table outputTable = tEnv.fromDataStream(output);
+        return new Table[] {outputTable};
+    }
+
+    @Override
+    public void save(String path) throws IOException {
+        ReadWriteUtils.saveMetadata(this, path);
+    }
+
+    public static PolynomialExpansion load(StreamTableEnvironment env, String 
path)
+            throws IOException {
+        return ReadWriteUtils.loadStageParam(path);
+    }
+
+    @Override
+    public Map<Param<?>, Object> getParamMap() {
+        return paramMap;
+    }
+
+    /** Polynomial expansion function that expands a vector in polynomial 
space. */
+    private static class PolynomialExpansionFunction implements 
MapFunction<Row, Row> {
+        private final int degree;
+        private final String inputCol;
+
+        public PolynomialExpansionFunction(int degree, String inputCol) {
+            this.degree = degree;
+            this.inputCol = inputCol;
+        }
+
+        @Override
+        public Row map(Row row) throws Exception {
+            Vector vec = row.getFieldAs(inputCol);
+            if (vec == null) {
+                return row;
+            }
+            Vector outputVec = null;
+            if (vec instanceof DenseVector) {
+                int size = vec.size();
+                double[] retVals = new double[getResultVectorSize(size, 
degree) - 1];
+                expandDenseVector(((DenseVector) vec).values, size - 1, 
degree, 1.0, retVals, -1);
+                outputVec = new DenseVector(retVals);
+            } else if (vec instanceof SparseVector) {

Review Comment:
   How about adding a `else` condition and throw exceptions when `!(vec 
instanceof DenseVector || vec instanceof SparseVector`?



##########
docs/content/docs/operators/feature/polynomialexpansion.md:
##########
@@ -0,0 +1,160 @@
+---
+title: "PolynomialExpansion"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/polynomialexpansion.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## PolynomialExpansion
+
+A Transformer that expands the input vectors in polynomial space.
+
+<p>Take a 2-dimension vector as an example: `(x, y)`, if we want to expand it 
with degree 2, then

Review Comment:
   nit: markdown document does not need tags like `<p>`.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/polynomialexpansion/PolynomialExpansion.java:
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.polynomialexpansion;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.typeinfo.VectorTypeInfo;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.math3.util.ArithmeticUtils;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A Transformer that expands the input vectors in polynomial space.
+ *
+ * <p>Take a 2-dimension vector as an example: `(x, y)`, if we want to expand 
it with degree 2, then
+ * we get `(x, x * x, y, x * y, y * y)`.
+ *
+ * <p>For more information about the polynomial expansion, see
+ * http://en.wikipedia.org/wiki/Polynomial_expansion.
+ */
+public class PolynomialExpansion
+        implements Transformer<PolynomialExpansion>,
+                PolynomialExpansionParams<PolynomialExpansion> {
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    public PolynomialExpansion() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public Table[] transform(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) 
inputs[0]).getTableEnvironment();
+        RowTypeInfo inputTypeInfo = 
TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema());
+
+        RowTypeInfo outputTypeInfo =
+                new RowTypeInfo(
+                        ArrayUtils.addAll(inputTypeInfo.getFieldTypes(), 
VectorTypeInfo.INSTANCE),
+                        ArrayUtils.addAll(inputTypeInfo.getFieldNames(), 
getOutputCol()));
+
+        DataStream<Row> output =
+                tEnv.toDataStream(inputs[0])
+                        .map(
+                                new PolynomialExpansionFunction(getDegree(), 
getInputCol()),
+                                outputTypeInfo);
+
+        Table outputTable = tEnv.fromDataStream(output);
+        return new Table[] {outputTable};
+    }
+
+    @Override
+    public void save(String path) throws IOException {
+        ReadWriteUtils.saveMetadata(this, path);
+    }
+
+    public static PolynomialExpansion load(StreamTableEnvironment env, String 
path)
+            throws IOException {
+        return ReadWriteUtils.loadStageParam(path);
+    }
+
+    @Override
+    public Map<Param<?>, Object> getParamMap() {
+        return paramMap;
+    }
+
+    /** Polynomial expansion function that expands a vector in polynomial 
space. */
+    private static class PolynomialExpansionFunction implements 
MapFunction<Row, Row> {
+        private final int degree;
+        private final String inputCol;
+
+        public PolynomialExpansionFunction(int degree, String inputCol) {
+            this.degree = degree;
+            this.inputCol = inputCol;
+        }
+
+        @Override
+        public Row map(Row row) throws Exception {
+            Vector vec = row.getFieldAs(inputCol);
+            if (vec == null) {
+                return row;
+            }
+            Vector outputVec = null;
+            if (vec instanceof DenseVector) {
+                int size = vec.size();
+                double[] retVals = new double[getResultVectorSize(size, 
degree) - 1];
+                expandDenseVector(((DenseVector) vec).values, size - 1, 
degree, 1.0, retVals, -1);
+                outputVec = new DenseVector(retVals);
+            } else if (vec instanceof SparseVector) {
+                SparseVector sparseVec = (SparseVector) vec;
+                int[] indices = sparseVec.indices;
+                double[] values = sparseVec.values;
+                int size = sparseVec.size();
+                int nnz = sparseVec.values.length;
+                int nnzPolySize = getResultVectorSize(nnz, degree);
+
+                Tuple2<Integer, int[]> polyIndices = Tuple2.of(0, new 
int[nnzPolySize - 1]);
+                Tuple2<Integer, double[]> polyValues = Tuple2.of(0, new 
double[nnzPolySize - 1]);
+                expandSparseVector(
+                        indices,
+                        values,
+                        nnz - 1,
+                        size - 1,
+                        degree,
+                        1.0,
+                        polyIndices,
+                        polyValues,
+                        -1);
+
+                outputVec =
+                        new SparseVector(
+                                getResultVectorSize(size, degree) - 1,
+                                polyIndices.f1,
+                                polyValues.f1);
+            }
+            return Row.join(row, Row.of(outputVec));
+        }
+
+        /** Calculates the length of the expended vector. */
+        static int getResultVectorSize(int num, int degree) {

Review Comment:
   nit: `private` is enough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to