Jackie-Jiang commented on a change in pull request #7584:
URL: https://github.com/apache/pinot/pull/7584#discussion_r736029051
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
##########
@@ -330,6 +351,99 @@ public MinMaxRangePair deserialize(ByteBuffer byteBuffer) {
}
};
+ public static final ObjectSerDe<? extends ValueLongPair<Integer>>
INT_LONG_PAIR_SER_DE
+ = new ObjectSerDe<IntLongPair>() {
Review comment:
(minor) Same for other serDe
```suggestion
public static final ObjectSerDe<IntLongPair> INT_LONG_PAIR_SER_DE = new
ObjectSerDe<IntLongPair>() {
```
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastDoubleValueWithTimeAggregationFunction.java
##########
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.DoubleLongPair;
+import org.apache.pinot.segment.local.customobject.ValueLongPair;
+
+
+/**
+ * This function is used for LastWithTime calculations for data column with
double type.
+ * <p>The function can be used as LastWithTime(dataExpression, timeExpression,
'double')
+ * <p>Following arguments are supported:
+ * <ul>
+ * <li>dataExpression: expression that contains the double data column to be
calculated last on</li>
+ * <li>timeExpression: expression that contains the column to be used to
decide which data is last, can be any
+ * Numeric column</li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class LastDoubleValueWithTimeAggregationFunction extends
LastWithTimeAggregationFunction<Double> {
+
+ private final static ValueLongPair<Double> DEFAULT_VALUE_TIME_PAIR
+ = new DoubleLongPair(Double.NaN, Long.MIN_VALUE);
+
+ public LastDoubleValueWithTimeAggregationFunction(
+ ExpressionContext dataCol,
+ ExpressionContext timeCol) {
+ super(dataCol, timeCol, ObjectSerDeUtils.DOUBLE_LONG_PAIR_SER_DE);
+ }
+
+ @Override
+ public ValueLongPair<Double> constructValueLongPair(Double value, long time)
{
+ return new DoubleLongPair(value, time);
+ }
+
+ @Override
+ public ValueLongPair<Double> getDefaultValueTimePair() {
+ return DEFAULT_VALUE_TIME_PAIR;
+ }
+
+ @Override
+ public void aggregateResultWithRawData(int length, AggregationResultHolder
aggregationResultHolder,
+ BlockValSet blockValSet, BlockValSet timeValSet) {
+ ValueLongPair<Double> defaultValueLongPair = getDefaultValueTimePair();
+ Double lastData = defaultValueLongPair.getValue();
+ long lastTime = defaultValueLongPair.getTime();
+ double[] doubleValues = blockValSet.getDoubleValuesSV();
+ long[] timeValues = timeValSet.getLongValuesSV();
+ for (int i = 0; i < length; i++) {
+ double data = doubleValues[i];
+ long time = timeValues[i];
+ if (time >= lastTime) {
+ lastTime = time;
+ lastData = data;
+ }
+ }
+ setAggregationResult(aggregationResultHolder, lastData, lastTime);
+ }
+
+ @Override
+ public void aggregateGroupResultWithRawDataSv(int length, int[]
groupKeyArray,
Review comment:
(minor, code format) Suggest reformatting them, same for other places
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/LastWithTimeAggregationFunction.java
##########
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.local.customobject.ValueLongPair;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+
+/**
+ * This function is used for LastWithTime calculations.
+ * <p>The function can be used as LastWithTime(dataExpression, timeExpression,
'dataType')
+ * <p>Following arguments are supported:
+ * <ul>
+ * <li>dataExpression: expression that contains the column to be calculated
last on</li>
+ * <li>timeExpression: expression that contains the column to be used to
decide which data is last, can be any
+ * Numeric column</li>
+ * <li>dataType: the data type of data column</li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public abstract class LastWithTimeAggregationFunction<V extends Comparable<V>>
+ extends BaseSingleInputAggregationFunction<ValueLongPair<V>, V> {
+ protected final ExpressionContext _timeCol;
+ private final ObjectSerDeUtils.ObjectSerDe<? extends ValueLongPair<V>>
_objectSerDe;
+
+ public LastWithTimeAggregationFunction(ExpressionContext dataCol,
+ ExpressionContext timeCol,
+ ObjectSerDeUtils.ObjectSerDe<? extends ValueLongPair<V>> objectSerDe) {
+ super(dataCol);
+ _timeCol = timeCol;
+ _objectSerDe = objectSerDe;
+ }
+
+ public abstract ValueLongPair<V> constructValueLongPair(V value, long time);
+
+ public abstract ValueLongPair<V> getDefaultValueTimePair();
+
+ public abstract void aggregateResultWithRawData(int length,
AggregationResultHolder aggregationResultHolder,
+ BlockValSet blockValSet, BlockValSet timeValSet);
+
+ public abstract void aggregateGroupResultWithRawDataSv(int length,
+ int[] groupKeyArray,
+ GroupByResultHolder groupByResultHolder,
+ BlockValSet blockValSet,
+ BlockValSet timeValSet);
+
+ public abstract void aggregateGroupResultWithRawDataMv(int length,
+ int[][] groupKeysArray,
+ GroupByResultHolder groupByResultHolder,
+ BlockValSet blockValSet,
+ BlockValSet timeValSet);
+
+ @Override
+ public AggregationFunctionType getType() {
+ return AggregationFunctionType.LASTWITHTIME;
+ }
+
+ @Override
+ public AggregationResultHolder createAggregationResultHolder() {
+ return new ObjectAggregationResultHolder();
+ }
+
+ @Override
+ public GroupByResultHolder createGroupByResultHolder(int initialCapacity,
int maxCapacity) {
+ return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+ }
+
+ @Override
+ public void aggregate(int length, AggregationResultHolder
aggregationResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+ BlockValSet blockTimeSet = blockValSetMap.get(_timeCol);
+ if (blockValSet.getValueType() != DataType.BYTES) {
+ aggregateResultWithRawData(length, aggregationResultHolder, blockValSet,
blockTimeSet);
+ } else {
+ ValueLongPair<V> defaultValueLongPair = getDefaultValueTimePair();
+ V lastData = defaultValueLongPair.getValue();
+ long lastTime = defaultValueLongPair.getTime();
+ // Serialized LastPair
+ byte[][] bytesValues = blockValSet.getBytesValuesSV();
+ for (int i = 0; i < length; i++) {
+ ValueLongPair<V> lastWithTimePair =
_objectSerDe.deserialize(bytesValues[i]);
+ V data = lastWithTimePair.getValue();
+ long time = lastWithTimePair.getTime();
+ if (time >= lastTime) {
+ lastTime = time;
+ lastData = data;
+ }
+ }
+ setAggregationResult(aggregationResultHolder, lastData, lastTime);
+ }
+ }
+
+ protected void setAggregationResult(AggregationResultHolder
aggregationResultHolder, V data, long time) {
+ ValueLongPair lastWithTimePair = aggregationResultHolder.getResult();
+ if (lastWithTimePair == null || time >= lastWithTimePair.getTime()) {
+ aggregationResultHolder.setValue(constructValueLongPair(data, time));
+ }
+ }
+
+ @Override
+ public void aggregateGroupBySV(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+ BlockValSet timeValSet = blockValSetMap.get(_timeCol);
+ if (blockValSet.getValueType() != DataType.BYTES) {
+ aggregateGroupResultWithRawDataSv(length, groupKeyArray,
groupByResultHolder,
+ blockValSet, timeValSet);
+ } else {
+ // Serialized LastPair
+ byte[][] bytesValues = blockValSet.getBytesValuesSV();
+ for (int i = 0; i < length; i++) {
+ ValueLongPair<V> lastWithTimePair =
_objectSerDe.deserialize(bytesValues[i]);
+ setGroupByResult(groupKeyArray[i],
+ groupByResultHolder,
+ lastWithTimePair.getValue(),
+ lastWithTimePair.getTime());
+ }
+ }
+ }
+
+ @Override
+ public void aggregateGroupByMV(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
+ BlockValSet timeValSet = blockValSetMap.get(_timeCol);
+ if (blockValSet.getValueType() != DataType.BYTES) {
+ aggregateGroupResultWithRawDataMv(length, groupKeysArray,
groupByResultHolder, blockValSet, timeValSet);
+ } else {
+ // Serialized ValueTimePair
+ byte[][] bytesValues = blockValSet.getBytesValuesSV();
+ for (int i = 0; i < length; i++) {
+ ValueLongPair<V> lastWithTimePair =
_objectSerDe.deserialize(bytesValues[i]);
+ V data = lastWithTimePair.getValue();
+ long time = lastWithTimePair.getTime();
+ for (int groupKey : groupKeysArray[i]) {
+ setGroupByResult(groupKey, groupByResultHolder, data, time);
+ }
+ }
+ }
+ }
+
+ protected void setGroupByResult(int groupKey, GroupByResultHolder
groupByResultHolder, V data, long time) {
+ ValueLongPair lastWithTimePair = groupByResultHolder.getResult(groupKey);
+ if (lastWithTimePair == null || time >= lastWithTimePair.getTime()) {
+ groupByResultHolder.setValueForKey(groupKey,
constructValueLongPair(data, time));
+ }
+ }
+
+ @Override
+ public ValueLongPair<V> extractAggregationResult(AggregationResultHolder
aggregationResultHolder) {
+ ValueLongPair lastWithTimePair = aggregationResultHolder.getResult();
+ if (lastWithTimePair == null) {
+ return getDefaultValueTimePair();
+ } else {
+ return lastWithTimePair;
+ }
+ }
+
+ @Override
+ public ValueLongPair<V> extractGroupByResult(GroupByResultHolder
groupByResultHolder, int groupKey) {
+ ValueLongPair<V> lastWithTimePair =
groupByResultHolder.getResult(groupKey);
+ if (lastWithTimePair == null) {
+ return getDefaultValueTimePair();
+ } else {
+ return lastWithTimePair;
+ }
+ }
+
+ @Override
+ public ValueLongPair<V> merge(ValueLongPair<V> intermediateResult1,
ValueLongPair<V> intermediateResult2) {
+ if (intermediateResult1.getTime() >= intermediateResult2.getTime()) {
+ return intermediateResult1;
+ } else {
+ return intermediateResult2;
+ }
+ }
+
+ @Override
+ public List<ExpressionContext> getInputExpressions() {
+ return Arrays.asList(_expression, _timeCol);
+ }
+
+ @Override
+ public boolean isIntermediateResultComparable() {
+ return true;
Review comment:
This should be `false`
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
##########
@@ -109,7 +115,12 @@ private ObjectSerDeUtils() {
Int2LongMap(23),
Long2LongMap(24),
Float2LongMap(25),
- Double2LongMap(26);
+ Double2LongMap(26),
+ IntValueTimePair(27),
Review comment:
Let's also change the enum name to match the Pair (`IntLongPair`,
`LongLongPair`, etc.)
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
##########
@@ -156,6 +157,46 @@ public static AggregationFunction
getAggregationFunction(FunctionContext functio
return new AvgAggregationFunction(firstArgument);
case MODE:
return new ModeAggregationFunction(arguments);
+ case LASTWITHTIME:
+ if (arguments.size() == 3) {
+ ExpressionContext timeCol = arguments.get(1);
+ ExpressionContext dataType = arguments.get(2);
+ if (dataType.getType() != ExpressionContext.Type.LITERAL) {
+ throw new IllegalArgumentException("Third argument of
lastWithTime Function should be literal."
+ + " The function can be used as lastWithTime(dataColumn,
timeColumn, 'dataType')");
+ }
+ FieldSpec.DataType fieldDataType
+ =
FieldSpec.DataType.valueOf(dataType.getLiteral().toUpperCase());
+ switch (fieldDataType) {
+ case BOOLEAN:
+ case INT:
+ return new LastIntValueWithTimeAggregationFunction(
+ firstArgument,
Review comment:
(minor, code format) We usually put arguments in the same line. Same for
other places
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]