JingsongLi commented on code in PR #270:
URL: https://github.com/apache/flink-table-store/pull/270#discussion_r954495687


##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldListaggAgg.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact.aggregate;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+/** listagg aggregate a field of a row. */
+public class FieldListaggAgg extends FieldAggregator {
+    // TODO: make it configurable by with clause
+    public static final char DELIMITER = '\n';
+
+    public FieldListaggAgg(LogicalType logicalType) {
+        super(logicalType);
+    }
+
+    @Override
+    void agg(
+            GenericRowData mergeRow,
+            RowData inRow,
+            RowData.FieldGetter[] fieldGetters,
+            int fieldPos) {
+        Object concatenate;
+        Object mergeField = fieldGetters[fieldPos].getFieldOrNull(mergeRow);
+        Object inField = fieldGetters[fieldPos].getFieldOrNull(inRow);
+
+        if (inField == null) {
+            return;
+        } else if (mergeField == null) {
+            concatenate = inField;
+        } else {
+            // ordered by type root definition
+            switch (fieldType.getTypeRoot()) {
+                case VARCHAR:
+                    // TODO: ensure not VARCHAR(n)
+                    StringData mergeFieldSD = (StringData) mergeField;
+                    StringData inFieldSD = (StringData) inField;
+                    // currently, only support String Type. VARCHAR will lead 
to overflow easily
+                    if (((VarCharType) fieldType).getLength() != 
Integer.MAX_VALUE) {

Review Comment:
   We don't need to check `VARCHAR(n)`, let it go.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldListaggAgg.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact.aggregate;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+/** listagg aggregate a field of a row. */
+public class FieldListaggAgg extends FieldAggregator {
+    // TODO: make it configurable by with clause
+    public static final char DELIMITER = '\n';

Review Comment:
   The default value of separator is ‘,’.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldListaggAgg.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact.aggregate;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+/** listagg aggregate a field of a row. */
+public class FieldListaggAgg extends FieldAggregator {
+    // TODO: make it configurable by with clause
+    public static final char DELIMITER = '\n';
+
+    public FieldListaggAgg(LogicalType logicalType) {
+        super(logicalType);
+    }
+
+    @Override
+    void agg(
+            GenericRowData mergeRow,
+            RowData inRow,
+            RowData.FieldGetter[] fieldGetters,
+            int fieldPos) {
+        Object concatenate;
+        Object mergeField = fieldGetters[fieldPos].getFieldOrNull(mergeRow);
+        Object inField = fieldGetters[fieldPos].getFieldOrNull(inRow);
+
+        if (inField == null) {
+            return;
+        } else if (mergeField == null) {
+            concatenate = inField;
+        } else {
+            // ordered by type root definition
+            switch (fieldType.getTypeRoot()) {
+                case VARCHAR:
+                    // TODO: ensure not VARCHAR(n)
+                    StringData mergeFieldSD = (StringData) mergeField;
+                    StringData inFieldSD = (StringData) inField;
+                    // currently, only support String Type. VARCHAR will lead 
to overflow easily
+                    if (((VarCharType) fieldType).getLength() != 
Integer.MAX_VALUE) {
+                        throw new IllegalArgumentException();
+                    }
+                    concatenate =
+                            StringData.fromString(mergeFieldSD.toString() + 
DELIMITER + inFieldSD);

Review Comment:
   You can cast `StringData` to `BinaryStringData`.
   And use `BinaryStringDataUtil.concat`.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/RowAggregator.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact.aggregate;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.io.Serializable;
+import java.util.List;
+
+import static org.apache.flink.configuration.ConfigOptions.key;
+
+/** Provide an Aggregator for merge a new row data. */
+public class RowAggregator implements Serializable {

Review Comment:
   Maybe just merge this class into `AggregateMergeFunction`?



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldPKAgg.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact.aggregate;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+
+/** aggregate by primary keys, so no need to change them except initializing. 
*/
+public class FieldPKAgg extends FieldAggregator {

Review Comment:
   Maybe just use `LAST_VALUE`?



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldMinAgg.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact.aggregate;
+
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.DecimalDataUtils;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.logical.LogicalType;
+
+/** min aggregate a field of a row. */
+public class FieldMinAgg extends FieldAggregator {
+    public FieldMinAgg(LogicalType logicalType) {
+        super(logicalType);
+    }
+
+    @Override
+    void agg(
+            GenericRowData mergeRow,
+            RowData inRow,
+            RowData.FieldGetter[] fieldGetters,
+            int fieldPos) {
+        Object min;
+        Object mergeField = fieldGetters[fieldPos].getFieldOrNull(mergeRow);
+        Object inField = fieldGetters[fieldPos].getFieldOrNull(inRow);
+
+        if (mergeField == null || inField == null) {
+            min = (mergeField == null ? inField : mergeField);
+        } else {
+            // ordered by type root definition
+            switch (fieldType.getTypeRoot()) {

Review Comment:
   Can we just extract a method `compare` to reuse logical for min and max? 
Maybe just in `RowDataUtils`.



##########
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/aggregate/FieldAggregator.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact.aggregate;
+
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.io.Serializable;
+
+/** abstract class of aggregating a field of a row. */
+public abstract class FieldAggregator implements Serializable {
+    protected LogicalType fieldType;
+
+    public FieldAggregator(LogicalType logicalType) {
+        this.fieldType = logicalType;
+    }
+
+    static FieldAggregator createFieldAggregator(
+            LogicalType fieldType, String strAgg, boolean isPrimaryKey) {
+        final FieldAggregator fieldAggregator;
+        if (isPrimaryKey) {
+            fieldAggregator = new FieldPKAgg(fieldType);
+        } else {
+            // ordered by type root definition
+            switch (strAgg) {
+                case "sum":
+                    fieldAggregator = new FieldSumAgg(fieldType);
+                    break;
+                case "max":
+                    fieldAggregator = new FieldMaxAgg(fieldType);
+                    break;
+                case "min":
+                    fieldAggregator = new FieldMinAgg(fieldType);
+                    break;
+                case "last_non_null_value":
+                    fieldAggregator = new FieldLastNonNullValueAgg(fieldType);
+                    break;
+                case "last_value":
+                    fieldAggregator = new FieldLastValueAgg(fieldType);
+                    break;
+                case "listagg":
+                    fieldAggregator = new FieldListaggAgg(fieldType);
+                    break;
+                case "bool_or":
+                    fieldAggregator = new FieldBoolOrAgg(fieldType);
+                    break;
+                case "bool_and":
+                    fieldAggregator = new FieldBoolAndAgg(fieldType);
+                    break;
+                default:
+                    throw new ValidationException(
+                            "Use unsupported aggregation or spell aggregate 
function incorrectly!");
+            }
+        }
+        return fieldAggregator;
+    }
+
+    abstract void agg(

Review Comment:
   Why not just `Object agg(Object accumulator, Object inputField)`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to