This is an automated email from the ASF dual-hosted git repository.
gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f72b494858 Modify MSE operators to release memory once computation
finished (#15977)
f72b494858 is described below
commit f72b4948588d361df86746a0557d3f095f9cba2a
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Wed Jun 4 12:41:37 2025 +0200
Modify MSE operators to release memory once computation finished (#15977)
---
.../query/runtime/operator/AggregateOperator.java | 14 +++++++++++---
.../query/runtime/operator/AsofJoinOperator.java | 10 +++++++++-
.../query/runtime/operator/BaseJoinOperator.java | 6 ++++++
.../query/runtime/operator/HashJoinOperator.java | 22 ++++++++++++++++++++--
.../runtime/operator/NonEquiJoinOperator.java | 8 ++++++++
.../runtime/operator/WindowAggregateOperator.java | 17 ++++++++---------
6 files changed, 62 insertions(+), 15 deletions(-)
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
index 705d171d6c..cbb148ebed 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
@@ -73,8 +73,10 @@ public class AggregateOperator extends MultiStageOperator {
private final MultiStageOperator _input;
private final DataSchema _resultSchema;
private final AggregationFunction<?, ?>[] _aggFunctions;
- private final MultistageAggregationExecutor _aggregationExecutor;
- private final MultistageGroupByExecutor _groupByExecutor;
+ @Nullable
+ private MultistageAggregationExecutor _aggregationExecutor;
+ @Nullable
+ private MultistageGroupByExecutor _groupByExecutor;
@Nullable
private MseBlock.Eos _eosBlock;
@@ -205,13 +207,17 @@ public class AggregateOperator extends MultiStageOperator
{
if (finalBlock.isError()) {
return finalBlock;
}
- return produceAggregatedBlock();
+ MseBlock mseBlock = produceAggregatedBlock();
+ _aggregationExecutor = null;
+ _groupByExecutor = null;
+ return mseBlock;
}
private MseBlock produceAggregatedBlock() {
if (_aggregationExecutor != null) {
return new RowHeapDataBlock(_aggregationExecutor.getResult(),
_resultSchema, _aggFunctions);
} else {
+ assert _groupByExecutor != null;
List<Object[]> rows;
if (_comparator != null) {
rows = _groupByExecutor.getResult(_comparator, _groupTrimSize);
@@ -253,6 +259,7 @@ public class AggregateOperator extends MultiStageOperator {
* @return the last block, which must always be either an error or the end
of the stream
*/
private MseBlock.Eos consumeGroupBy() {
+ assert _groupByExecutor != null;
MseBlock block = _input.nextBlock();
while (block.isData()) {
_groupByExecutor.processBlock((MseBlock.Data) block);
@@ -268,6 +275,7 @@ public class AggregateOperator extends MultiStageOperator {
* @return the last block, which must always be either an error or the end
of the stream
*/
private MseBlock.Eos consumeAggregation() {
+ assert _aggregationExecutor != null;
MseBlock block = _input.nextBlock();
while (block.isData()) {
_aggregationExecutor.processBlock((MseBlock.Data) block);
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AsofJoinOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AsofJoinOperator.java
index 5c98a9ce29..3dd0a7781e 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AsofJoinOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AsofJoinOperator.java
@@ -39,7 +39,8 @@ public class AsofJoinOperator extends BaseJoinOperator {
// The right table is a map from the hash key (columns in the ON join
condition) to a sorted map of match key
// (column in the MATCH_CONDITION) to rows.
- private final Map<Object, NavigableMap<Comparable<?>, Object[]>> _rightTable;
+ @Nullable
+ private Map<Object, NavigableMap<Comparable<?>, Object[]>> _rightTable;
private final KeySelector<?> _leftKeySelector;
private final KeySelector<?> _rightKeySelector;
private final MatchConditionType _matchConditionType;
@@ -68,6 +69,7 @@ public class AsofJoinOperator extends BaseJoinOperator {
@Override
protected void addRowsToRightTable(List<Object[]> rows) {
+ assert _rightTable != null : "Right table should not be null when adding
rows";
for (Object[] row : rows) {
Comparable<?> matchKey = (Comparable<?>) row[_rightMatchKeyIndex];
if (matchKey == null) {
@@ -86,8 +88,14 @@ public class AsofJoinOperator extends BaseJoinOperator {
// no-op
}
+ @Override
+ protected void onEosProduced() {
+ _rightTable = null; // Release memory in case we keep the operator around
for a while
+ }
+
@Override
protected List<Object[]> buildJoinedRows(MseBlock.Data leftBlock) {
+ assert _rightTable != null : "Right table should not be null when building
joined rows";
List<Object[]> rows = new ArrayList<>();
for (Object[] leftRow : leftBlock.asRowHeap().getRows()) {
Comparable<?> matchKey = (Comparable<?>) leftRow[_leftMatchKeyIndex];
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java
index 5040bae620..ff5ad30cfb 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java
@@ -169,9 +169,15 @@ public abstract class BaseJoinOperator extends
MultiStageOperator {
}
MseBlock mseBlock = buildJoinedDataBlock();
LOGGER.trace("Returning {} for join operator", mseBlock);
+ if (mseBlock.isEos()) {
+ _eos = (MseBlock.Eos) mseBlock;
+ onEosProduced();
+ }
return mseBlock;
}
+ protected abstract void onEosProduced();
+
protected void buildRightTable() {
LOGGER.trace("Building right table for join operator");
long startTime = System.currentTimeMillis();
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index 0cb1032367..9ad75c420f 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -24,6 +24,7 @@ import java.util.BitSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import javax.annotation.Nullable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.partitioning.KeySelector;
import org.apache.pinot.query.planner.partitioning.KeySelectorFactory;
@@ -51,11 +52,13 @@ public class HashJoinOperator extends BaseJoinOperator {
private final KeySelector<?> _leftKeySelector;
private final KeySelector<?> _rightKeySelector;
- private final LookupTable _rightTable;
+ @Nullable
+ private LookupTable _rightTable;
// Track matched right rows for right join and full join to output
non-matched right rows.
// TODO: Revisit whether we should use IntList or RoaringBitmap for smaller
memory footprint.
// TODO: Optimize this
- private final Map<Object, BitSet> _matchedRightRows;
+ @Nullable
+ private Map<Object, BitSet> _matchedRightRows;
public HashJoinOperator(OpChainExecutionContext context, MultiStageOperator
leftInput, DataSchema leftSchema,
MultiStageOperator rightInput, JoinNode node) {
@@ -93,6 +96,7 @@ public class HashJoinOperator extends BaseJoinOperator {
@Override
protected void addRowsToRightTable(List<Object[]> rows) {
+ assert _rightTable != null : "Right table should not be null when adding
rows";
for (Object[] row : rows) {
_rightTable.addRow(_rightKeySelector.getKey(row), row);
}
@@ -100,11 +104,19 @@ public class HashJoinOperator extends BaseJoinOperator {
@Override
protected void finishBuildingRightTable() {
+ assert _rightTable != null : "Right table should not be null when
finishing building";
_rightTable.finish();
}
+ @Override
+ protected void onEosProduced() {
+ _rightTable = null;
+ _matchedRightRows = null;
+ }
+
@Override
protected List<Object[]> buildJoinedRows(MseBlock.Data leftBlock) {
+ assert _rightTable != null : "Right table should not be null when building
joined rows";
switch (_joinType) {
case SEMI:
return buildJoinedDataBlockSemi(leftBlock);
@@ -121,6 +133,7 @@ public class HashJoinOperator extends BaseJoinOperator {
}
private List<Object[]> buildJoinedDataBlockUniqueKeys(MseBlock.Data
leftBlock) {
+ assert _rightTable != null : "Right table should not be null when building
joined rows";
List<Object[]> leftRows = leftBlock.asRowHeap().getRows();
ArrayList<Object[]> rows = new ArrayList<>(leftRows.size());
@@ -149,6 +162,7 @@ public class HashJoinOperator extends BaseJoinOperator {
}
private List<Object[]> buildJoinedDataBlockDuplicateKeys(MseBlock.Data
leftBlock) {
+ assert _rightTable != null : "Right table should not be null when building
joined rows";
List<Object[]> leftRows = leftBlock.asRowHeap().getRows();
List<Object[]> rows = new ArrayList<>(leftRows.size());
@@ -197,6 +211,7 @@ public class HashJoinOperator extends BaseJoinOperator {
}
private List<Object[]> buildJoinedDataBlockSemi(MseBlock.Data leftBlock) {
+ assert _rightTable != null : "Right table should not be null when building
joined rows";
List<Object[]> leftRows = leftBlock.asRowHeap().getRows();
List<Object[]> rows = new ArrayList<>(leftRows.size());
@@ -212,6 +227,7 @@ public class HashJoinOperator extends BaseJoinOperator {
}
private List<Object[]> buildJoinedDataBlockAnti(MseBlock.Data leftBlock) {
+ assert _rightTable != null : "Right table should not be null when building
joined rows";
List<Object[]> leftRows = leftBlock.asRowHeap().getRows();
List<Object[]> rows = new ArrayList<>(leftRows.size());
@@ -228,6 +244,8 @@ public class HashJoinOperator extends BaseJoinOperator {
@Override
protected List<Object[]> buildNonMatchRightRows() {
+ assert _rightTable != null : "Right table should not be null when building
non-matched right rows";
+ assert _matchedRightRows != null : "Matched right rows should not be null
when building non-matched right rows";
List<Object[]> rows = new ArrayList<>();
if (_rightTable.isKeysUnique()) {
for (Map.Entry<Object, Object> entry : _rightTable.entrySet()) {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NonEquiJoinOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NonEquiJoinOperator.java
index 34c3e99287..625b92fa11 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NonEquiJoinOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NonEquiJoinOperator.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
+import javax.annotation.Nullable;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.plannode.JoinNode;
@@ -39,6 +40,7 @@ public class NonEquiJoinOperator extends BaseJoinOperator {
private final List<Object[]> _rightTable;
// Track matched right rows for right join and full join to output
non-matched right rows.
// TODO: Revisit whether we should use IntList or RoaringBitmap for smaller
memory footprint.
+ @Nullable
private BitSet _matchedRightRows;
public NonEquiJoinOperator(OpChainExecutionContext context,
MultiStageOperator leftInput, DataSchema leftSchema,
@@ -67,6 +69,11 @@ public class NonEquiJoinOperator extends BaseJoinOperator {
}
}
+ @Override
+ protected void onEosProduced() {
+ _matchedRightRows = null;
+ }
+
@Override
protected List<Object[]> buildJoinedRows(MseBlock.Data leftBlock) {
ArrayList<Object[]> rows = new ArrayList<>();
@@ -106,6 +113,7 @@ public class NonEquiJoinOperator extends BaseJoinOperator {
@Override
protected List<Object[]> buildNonMatchRightRows() {
+ assert _matchedRightRows != null : "Matched right rows should not be null
when building non-matched right rows";
int numRightRows = _rightTable.size();
int numMatchedRightRows = _matchedRightRows.cardinality();
if (numMatchedRightRows == numRightRows) {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
index 2ac18a0647..f50fe2a5db 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
@@ -91,9 +91,7 @@ public class WindowAggregateOperator extends
MultiStageOperator {
private final MultiStageOperator _input;
private final DataSchema _resultSchema;
private final int[] _keys;
- private final WindowFrame _windowFrame;
private final WindowFunction[] _windowFunctions;
- private final Map<Key, List<Object[]>> _partitionRows = new HashMap<>();
private final StatMap<StatKey> _statMap = new StatMap<>(StatKey.class);
// Below are specific parameters to protect the window cache from growing
too large.
@@ -125,12 +123,12 @@ public class WindowAggregateOperator extends
MultiStageOperator {
for (int i = 0; i < numKeys; i++) {
_keys[i] = keys.get(i);
}
- _windowFrame = new WindowFrame(node.getWindowFrameType(),
node.getLowerBound(), node.getUpperBound());
+ WindowFrame windowFrame = new WindowFrame(node.getWindowFrameType(),
node.getLowerBound(), node.getUpperBound());
Preconditions.checkState(
- _windowFrame.isRowType() || ((_windowFrame.isUnboundedPreceding() ||
_windowFrame.isLowerBoundCurrentRow()) && (
- _windowFrame.isUnboundedFollowing() ||
_windowFrame.isUpperBoundCurrentRow())),
+ windowFrame.isRowType() || ((windowFrame.isUnboundedPreceding() ||
windowFrame.isLowerBoundCurrentRow()) && (
+ windowFrame.isUnboundedFollowing() ||
windowFrame.isUpperBoundCurrentRow())),
"RANGE window frame with offset PRECEDING / FOLLOWING is not
supported");
- Preconditions.checkState(_windowFrame.getLowerBound() <=
_windowFrame.getUpperBound(),
+ Preconditions.checkState(windowFrame.getLowerBound() <=
windowFrame.getUpperBound(),
"Window frame lower bound can't be greater than upper bound");
List<RelFieldCollation> collations = node.getCollations();
List<RexExpression.FunctionCall> aggCalls = node.getAggCalls();
@@ -139,7 +137,7 @@ public class WindowAggregateOperator extends
MultiStageOperator {
for (int i = 0; i < numAggCalls; i++) {
RexExpression.FunctionCall aggCall = aggCalls.get(i);
_windowFunctions[i] =
- WindowFunctionFactory.constructWindowFunction(aggCall, inputSchema,
collations, _windowFrame);
+ WindowFunctionFactory.constructWindowFunction(aggCall, inputSchema,
collations, windowFrame);
}
Map<String, String> metadata = context.getOpChainMetadata();
@@ -210,6 +208,7 @@ public class WindowAggregateOperator extends
MultiStageOperator {
* @return the final block, which must be either an end of stream or an
error.
*/
private MseBlock computeBlocks() {
+ Map<Key, List<Object[]>> partitionRows = new HashMap<>();
MseBlock block = _input.nextBlock();
while (block.isData()) {
List<Object[]> container = ((MseBlock.Data) block).asRowHeap().getRows();
@@ -231,7 +230,7 @@ public class WindowAggregateOperator extends
MultiStageOperator {
for (Object[] row : container) {
// TODO: Revisit null direction handling for all query types
Key key = AggregationUtils.extractRowKey(row, _keys);
- _partitionRows.computeIfAbsent(key, k -> new ArrayList<>()).add(row);
+ partitionRows.computeIfAbsent(key, k -> new ArrayList<>()).add(row);
}
_numRows += containerSize;
sampleAndCheckInterruption();
@@ -246,7 +245,7 @@ public class WindowAggregateOperator extends
MultiStageOperator {
ColumnDataType[] resultStoredTypes =
_resultSchema.getStoredColumnDataTypes();
List<Object[]> rows = new ArrayList<>(_numRows);
- for (Map.Entry<Key, List<Object[]>> e : _partitionRows.entrySet()) {
+ for (Map.Entry<Key, List<Object[]>> e : partitionRows.entrySet()) {
List<Object[]> rowList = e.getValue();
// Each window function will return a list of results for each row in
the input set
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]