This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new fae135e33f5 Be sure block serialization can be interrupted (#16595)
fae135e33f5 is described below

commit fae135e33f5e57564ecfc6c3ffc13b373f7daf0a
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Tue Aug 26 08:58:18 2025 +0200

    Be sure block serialization can be interrupted (#16595)
---
 .../core/common/datablock/DataBlockBuilder.java    | 440 ++++++++++++---------
 .../runtime/operator/MailboxSendOperator.java      |   2 +-
 .../query/runtime/operator/MultiStageOperator.java |   1 +
 3 files changed, 247 insertions(+), 196 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
index 3ade560b0b3..1aaebb5625e 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
@@ -91,93 +91,95 @@ public class DataBlockBuilder {
     PagedPinotOutputStream varSize = new PagedPinotOutputStream(allocator);
     Object2IntOpenHashMap<String> dictionary = new Object2IntOpenHashMap<>();
 
-    for (int rowId = 0; rowId < numRows; rowId++) {
-      Object[] row = rows.get(rowId);
-      for (int colId = 0; colId < numColumns; colId++) {
-        Object value = row[colId];
-        ColumnDataType storedType = storedTypes[colId];
+    interruptableLoop(0, numRows, 1000, (start, end) -> {
+      for (int rowId = start; rowId < end; rowId++) {
+        Object[] row = rows.get(rowId);
+        for (int colId = 0; colId < numColumns; colId++) {
+          Object value = row[colId];
+          ColumnDataType storedType = storedTypes[colId];
+
+          if (storedType == ColumnDataType.OBJECT) {
+            // Custom intermediate result for aggregation function
+            assert aggFunctions != null;
+            if (value == null) {
+              setNull(fixedSize, varSize);
+            } else {
+              // NOTE: The first (numColumns - numAggFunctions) columns are 
key columns
+              int numAggFunctions = aggFunctions.length;
+              AggregationFunction aggFunction = aggFunctions[colId + 
numAggFunctions - numColumns];
+              setColumn(fixedSize, varSize, 
aggFunction.serializeIntermediateResult(value));
+            }
+            continue;
+          }
 
-        if (storedType == ColumnDataType.OBJECT) {
-          // Custom intermediate result for aggregation function
-          assert aggFunctions != null;
           if (value == null) {
-            setNull(fixedSize, varSize);
-          } else {
-            // NOTE: The first (numColumns - numAggFunctions) columns are key 
columns
-            int numAggFunctions = aggFunctions.length;
-            AggregationFunction aggFunction = aggFunctions[colId + 
numAggFunctions - numColumns];
-            setColumn(fixedSize, varSize, 
aggFunction.serializeIntermediateResult(value));
+            if (storedType == ColumnDataType.UNKNOWN) {
+              setNull(fixedSize, varSize);
+              continue;
+            } else {
+              nullBitmaps[colId].add(rowId);
+              value = nullPlaceholders[colId];
+            }
           }
-          continue;
-        }
 
-        if (value == null) {
-          if (storedType == ColumnDataType.UNKNOWN) {
-            setNull(fixedSize, varSize);
-            continue;
-          } else {
-            nullBitmaps[colId].add(rowId);
-            value = nullPlaceholders[colId];
+          // NOTE:
+          // We intentionally make the type casting very strict here (e.g. 
only accepting Integer for INT) to ensure the
+          // rows conform to the data schema. This can help catch the 
unexpected data type issues early.
+          switch (storedType) {
+            // Single-value column
+            case INT:
+              fixedSize.putInt((int) value);
+              break;
+            case LONG:
+              fixedSize.putLong((long) value);
+              break;
+            case FLOAT:
+              fixedSize.putFloat((float) value);
+              break;
+            case DOUBLE:
+              fixedSize.putDouble((double) value);
+              break;
+            case BIG_DECIMAL:
+              setColumn(fixedSize, varSize, (BigDecimal) value);
+              break;
+            case STRING:
+              int dictId = dictionary.computeIfAbsent((String) value, k -> 
dictionary.size());
+              fixedSize.putInt(dictId);
+              break;
+            case BYTES:
+              setColumn(fixedSize, varSize, (ByteArray) value);
+              break;
+            case MAP:
+              setColumn(fixedSize, varSize, (Map) value);
+              break;
+            // Multi-value column
+            case INT_ARRAY:
+              setColumn(fixedSize, varSize, (int[]) value);
+              break;
+            case LONG_ARRAY:
+              setColumn(fixedSize, varSize, (long[]) value);
+              break;
+            case FLOAT_ARRAY:
+              setColumn(fixedSize, varSize, (float[]) value);
+              break;
+            case DOUBLE_ARRAY:
+              setColumn(fixedSize, varSize, (double[]) value);
+              break;
+            case STRING_ARRAY:
+              setColumn(fixedSize, varSize, (String[]) value, dictionary);
+              break;
+            // Null
+            case UNKNOWN:
+              setNull(fixedSize, varSize);
+              break;
+
+            default:
+              throw new IllegalStateException(
+                  "Unsupported stored type: " + storedType + " for column: " + 
dataSchema.getColumnName(colId));
           }
         }
-
-        // NOTE:
-        // We intentionally make the type casting very strict here (e.g. only 
accepting Integer for INT) to ensure the
-        // rows conform to the data schema. This can help catch the unexpected 
data type issues early.
-        switch (storedType) {
-          // Single-value column
-          case INT:
-            fixedSize.putInt((int) value);
-            break;
-          case LONG:
-            fixedSize.putLong((long) value);
-            break;
-          case FLOAT:
-            fixedSize.putFloat((float) value);
-            break;
-          case DOUBLE:
-            fixedSize.putDouble((double) value);
-            break;
-          case BIG_DECIMAL:
-            setColumn(fixedSize, varSize, (BigDecimal) value);
-            break;
-          case STRING:
-            int dictId = dictionary.computeIfAbsent((String) value, k -> 
dictionary.size());
-            fixedSize.putInt(dictId);
-            break;
-          case BYTES:
-            setColumn(fixedSize, varSize, (ByteArray) value);
-            break;
-          case MAP:
-            setColumn(fixedSize, varSize, (Map) value);
-            break;
-          // Multi-value column
-          case INT_ARRAY:
-            setColumn(fixedSize, varSize, (int[]) value);
-            break;
-          case LONG_ARRAY:
-            setColumn(fixedSize, varSize, (long[]) value);
-            break;
-          case FLOAT_ARRAY:
-            setColumn(fixedSize, varSize, (float[]) value);
-            break;
-          case DOUBLE_ARRAY:
-            setColumn(fixedSize, varSize, (double[]) value);
-            break;
-          case STRING_ARRAY:
-            setColumn(fixedSize, varSize, (String[]) value, dictionary);
-            break;
-          // Null
-          case UNKNOWN:
-            setNull(fixedSize, varSize);
-            break;
-
-          default:
-            throw new IllegalStateException(
-                "Unsupported stored type: " + storedType + " for column: " + 
dataSchema.getColumnName(colId));
-        }
       }
-    }
+    });
 
     CompoundDataBuffer.Builder varBufferBuilder =
         new CompoundDataBuffer.Builder(ByteOrder.BIG_ENDIAN, 
true).addPagedOutputStream(varSize);
@@ -250,6 +252,7 @@ public class DataBlockBuilder {
     int numRows = columns.get(colId).length;
 
     Object[] column = columns.get(colId);
+    int interruptableLoopStep = 10000;
 
     // NOTE:
     // We intentionally make the type casting very strict here (e.g. only 
accepting Integer for INT) to ensure the
@@ -258,195 +261,225 @@ public class DataBlockBuilder {
       // Single-value column
       case INT: {
         int nullPlaceholder = (int) storedType.getNullPlaceholder();
-        for (int rowId = 0; rowId < numRows; rowId++) {
-          Object value = column[rowId];
-          if (value == null) {
-            nullBitmap.add(rowId);
-            fixedSize.putInt(nullPlaceholder);
-          } else {
-            fixedSize.putInt((int) value);
+        interruptableLoop(0, numRows, interruptableLoopStep, (start, end) -> {
+          for (int rowId = start; rowId < end; rowId++) {
+            Object value = column[rowId];
+            if (value == null) {
+              nullBitmap.add(rowId);
+              fixedSize.putInt(nullPlaceholder);
+            } else {
+              fixedSize.putInt((int) value);
+            }
           }
-        }
+        });
         break;
       }
       case LONG: {
         long nullPlaceholder = (long) storedType.getNullPlaceholder();
-        for (int rowId = 0; rowId < numRows; rowId++) {
-          Object value = column[rowId];
-          if (value == null) {
-            nullBitmap.add(rowId);
-            fixedSize.putLong(nullPlaceholder);
-          } else {
-            fixedSize.putLong((long) value);
+        interruptableLoop(0, numRows, interruptableLoopStep, (start, end) -> {
+          for (int rowId = start; rowId < end; rowId++) {
+            Object value = column[rowId];
+            if (value == null) {
+              nullBitmap.add(rowId);
+              fixedSize.putLong(nullPlaceholder);
+            } else {
+              fixedSize.putLong((long) value);
+            }
           }
-        }
+        });
         break;
       }
       case FLOAT: {
         float nullPlaceholder = (float) storedType.getNullPlaceholder();
-        for (int rowId = 0; rowId < numRows; rowId++) {
-          Object value = column[rowId];
-          if (value == null) {
-            nullBitmap.add(rowId);
-            fixedSize.putFloat(nullPlaceholder);
-          } else {
-            fixedSize.putFloat((float) value);
+        interruptableLoop(0, numRows, interruptableLoopStep, (start, end) -> {
+          for (int rowId = start; rowId < end; rowId++) {
+            Object value = column[rowId];
+            if (value == null) {
+              nullBitmap.add(rowId);
+              fixedSize.putFloat(nullPlaceholder);
+            } else {
+              fixedSize.putFloat((float) value);
+            }
           }
-        }
+        });
         break;
       }
       case DOUBLE: {
         double nullPlaceholder = (double) storedType.getNullPlaceholder();
-        for (int rowId = 0; rowId < numRows; rowId++) {
-          Object value = column[rowId];
-          if (value == null) {
-            nullBitmap.add(rowId);
-            fixedSize.putDouble(nullPlaceholder);
-          } else {
-            fixedSize.putDouble((double) value);
+        interruptableLoop(0, numRows, interruptableLoopStep, (start, end) -> {
+          for (int rowId = start; rowId < end; rowId++) {
+            Object value = column[rowId];
+            if (value == null) {
+              nullBitmap.add(rowId);
+              fixedSize.putDouble(nullPlaceholder);
+            } else {
+              fixedSize.putDouble((double) value);
+            }
           }
-        }
+        });
         break;
       }
       case BIG_DECIMAL: {
         BigDecimal nullPlaceholder = (BigDecimal) 
storedType.getNullPlaceholder();
-        for (int rowId = 0; rowId < numRows; rowId++) {
-          Object value = column[rowId];
-          if (value == null) {
-            nullBitmap.add(rowId);
-            setColumn(fixedSize, varSize, nullPlaceholder);
-          } else {
-            setColumn(fixedSize, varSize, (BigDecimal) value);
+        interruptableLoop(0, numRows, interruptableLoopStep, (start, end) -> {
+          for (int rowId = start; rowId < end; rowId++) {
+            Object value = column[rowId];
+            if (value == null) {
+              nullBitmap.add(rowId);
+              setColumn(fixedSize, varSize, nullPlaceholder);
+            } else {
+              setColumn(fixedSize, varSize, (BigDecimal) value);
+            }
           }
-        }
+        });
         break;
       }
       case STRING: {
         ToIntFunction<String> didSupplier = k -> dictionary.size();
         int nullPlaceHolder = dictionary.computeIfAbsent((String) 
storedType.getNullPlaceholder(), didSupplier);
 
-        for (int rowId = 0; rowId < numRows; rowId++) {
-          Object value = column[rowId];
-          if (value == null) {
-            nullBitmap.add(rowId);
-            fixedSize.putInt(nullPlaceHolder);
-          } else {
-            int dictId = dictionary.computeIfAbsent((String) value, 
didSupplier);
-            fixedSize.putInt(dictId);
+        interruptableLoop(0, numRows, interruptableLoopStep, (start, end) -> {
+          for (int rowId = 0; rowId < numRows; rowId++) {
+            Object value = column[rowId];
+            if (value == null) {
+              nullBitmap.add(rowId);
+              fixedSize.putInt(nullPlaceHolder);
+            } else {
+              int dictId = dictionary.computeIfAbsent((String) value, 
didSupplier);
+              fixedSize.putInt(dictId);
+            }
           }
-        }
+        });
         break;
       }
       case BYTES: {
         ByteArray nullPlaceholder = (ByteArray) 
storedType.getNullPlaceholder();
-        for (int rowId = 0; rowId < numRows; rowId++) {
-          Object value = column[rowId];
-          if (value == null) {
-            nullBitmap.add(rowId);
-            setColumn(fixedSize, varSize, nullPlaceholder);
-          } else {
-            setColumn(fixedSize, varSize, (ByteArray) value);
+        interruptableLoop(0, numRows, interruptableLoopStep, (start, end) -> {
+          for (int rowId = 0; rowId < numRows; rowId++) {
+            Object value = column[rowId];
+            if (value == null) {
+              nullBitmap.add(rowId);
+              setColumn(fixedSize, varSize, nullPlaceholder);
+            } else {
+              setColumn(fixedSize, varSize, (ByteArray) value);
+            }
           }
-        }
+        });
         break;
       }
       case MAP: {
         Map nullPlaceholder = (Map) storedType.getNullPlaceholder();
-        for (int rowId = 0; rowId < numRows; rowId++) {
-          Object value = column[rowId];
-          if (value == null) {
-            nullBitmap.add(rowId);
-            setColumn(fixedSize, varSize, nullPlaceholder);
-          } else {
-            setColumn(fixedSize, varSize, (Map) value);
+        interruptableLoop(0, numRows, interruptableLoopStep, (start, end) -> {
+          for (int rowId = 0; rowId < numRows; rowId++) {
+            Object value = column[rowId];
+            if (value == null) {
+              nullBitmap.add(rowId);
+              setColumn(fixedSize, varSize, nullPlaceholder);
+            } else {
+              setColumn(fixedSize, varSize, (Map) value);
+            }
           }
-        }
+        });
         break;
       }
       // Multi-value column
       case INT_ARRAY: {
         int[] nullPlaceholder = (int[]) storedType.getNullPlaceholder();
-        for (int rowId = 0; rowId < numRows; rowId++) {
-          Object value = column[rowId];
-          if (value == null) {
-            nullBitmap.add(rowId);
-            setColumn(fixedSize, varSize, nullPlaceholder);
-          } else {
-            setColumn(fixedSize, varSize, (int[]) value);
+        interruptableLoop(0, numRows, interruptableLoopStep, (start, end) -> {
+          for (int rowId = 0; rowId < numRows; rowId++) {
+            Object value = column[rowId];
+            if (value == null) {
+              nullBitmap.add(rowId);
+              setColumn(fixedSize, varSize, nullPlaceholder);
+            } else {
+              setColumn(fixedSize, varSize, (int[]) value);
+            }
           }
-        }
+        });
         break;
       }
       case LONG_ARRAY: {
         long[] nullPlaceholder = (long[]) storedType.getNullPlaceholder();
-        for (int rowId = 0; rowId < numRows; rowId++) {
-          Object value = column[rowId];
-          if (value == null) {
-            nullBitmap.add(rowId);
-            setColumn(fixedSize, varSize, nullPlaceholder);
-          } else {
-            setColumn(fixedSize, varSize, (long[]) value);
+        interruptableLoop(0, numRows, interruptableLoopStep, (start, end) -> {
+          for (int rowId = 0; rowId < numRows; rowId++) {
+            Object value = column[rowId];
+            if (value == null) {
+              nullBitmap.add(rowId);
+              setColumn(fixedSize, varSize, nullPlaceholder);
+            } else {
+              setColumn(fixedSize, varSize, (long[]) value);
+            }
           }
-        }
+        });
         break;
       }
       case FLOAT_ARRAY: {
         float[] nullPlaceholder = (float[]) storedType.getNullPlaceholder();
-        for (int rowId = 0; rowId < numRows; rowId++) {
-          Object value = column[rowId];
-          if (value == null) {
-            nullBitmap.add(rowId);
-            setColumn(fixedSize, varSize, nullPlaceholder);
-          } else {
-            setColumn(fixedSize, varSize, (float[]) value);
+        interruptableLoop(0, numRows, interruptableLoopStep, (start, end) -> {
+          for (int rowId = 0; rowId < numRows; rowId++) {
+            Object value = column[rowId];
+            if (value == null) {
+              nullBitmap.add(rowId);
+              setColumn(fixedSize, varSize, nullPlaceholder);
+            } else {
+              setColumn(fixedSize, varSize, (float[]) value);
+            }
           }
-        }
+        });
         break;
       }
       case DOUBLE_ARRAY: {
         double[] nullPlaceholder = (double[]) storedType.getNullPlaceholder();
-        for (int rowId = 0; rowId < numRows; rowId++) {
-          Object value = column[rowId];
-          if (value == null) {
-            nullBitmap.add(rowId);
-            setColumn(fixedSize, varSize, nullPlaceholder);
-          } else {
-            setColumn(fixedSize, varSize, (double[]) value);
+        interruptableLoop(0, numRows, interruptableLoopStep, (start, end) -> {
+          for (int rowId = 0; rowId < numRows; rowId++) {
+            Object value = column[rowId];
+            if (value == null) {
+              nullBitmap.add(rowId);
+              setColumn(fixedSize, varSize, nullPlaceholder);
+            } else {
+              setColumn(fixedSize, varSize, (double[]) value);
+            }
           }
-        }
+        });
         break;
       }
       case STRING_ARRAY: {
         String[] nullPlaceholder = (String[]) storedType.getNullPlaceholder();
-        for (int rowId = 0; rowId < numRows; rowId++) {
-          Object value = column[rowId];
-          if (value == null) {
-            nullBitmap.add(rowId);
-            setColumn(fixedSize, varSize, nullPlaceholder, dictionary);
-          } else {
-            setColumn(fixedSize, varSize, (String[]) value, dictionary);
+        interruptableLoop(0, numRows, interruptableLoopStep, (start, end) -> {
+          for (int rowId = 0; rowId < numRows; rowId++) {
+            Object value = column[rowId];
+            if (value == null) {
+              nullBitmap.add(rowId);
+              setColumn(fixedSize, varSize, nullPlaceholder, dictionary);
+            } else {
+              setColumn(fixedSize, varSize, (String[]) value, dictionary);
+            }
           }
-        }
+        });
         break;
       }
       // Custom intermediate result for aggregation function
       case OBJECT: {
         assert aggFunction != null;
-        for (int rowId = 0; rowId < numRows; rowId++) {
-          Object value = column[rowId];
-          if (value == null) {
-            setNull(fixedSize, varSize);
-          } else {
-            setColumn(fixedSize, varSize, 
aggFunction.serializeIntermediateResult(value));
+        interruptableLoop(0, numRows, interruptableLoopStep, (start, end) -> {
+          for (int rowId = 0; rowId < numRows; rowId++) {
+            Object value = column[rowId];
+            if (value == null) {
+              setNull(fixedSize, varSize);
+            } else {
+              setColumn(fixedSize, varSize, 
aggFunction.serializeIntermediateResult(value));
+            }
           }
-        }
+        });
         break;
       }
       // Null
       case UNKNOWN:
-        for (int rowId = 0; rowId < numRows; rowId++) {
-          setNull(fixedSize, varSize);
-        }
+        interruptableLoop(0, numRows, interruptableLoopStep, (start, end) -> {
+          for (int rowId = 0; rowId < numRows; rowId++) {
+            setNull(fixedSize, varSize);
+          }
+        });
         break;
 
       default:
@@ -622,4 +655,21 @@ public class DataBlockBuilder {
     fixedSize.putInt(0);
     varSize.writeInt(CustomObject.NULL_TYPE_VALUE);
   }
+
+  /// Iterate using two loops.
+  /// The outer loop will iterate over a maximum of configurable step rows and 
check for the interruption flag,
+  /// calling the inner loop to process the rows without checking the 
interruption flag.
+  static void interruptableLoop(int start, int max, int step, InnerLoop loop) 
throws IOException {
+    for (int i = start; i < max; i += step) {
+      if (Thread.currentThread().isInterrupted()) {
+        throw new RuntimeException("Thread interrupted while processing rows. 
Rows processed so far: " + i);
+      }
+      int end = Math.min(i + step, max);
+      loop.run(i, end);
+    }
+  }
+
+  private interface InnerLoop {
+    void run(int from, int to) throws IOException;
+  }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index e2a1009d53f..c1a0fcf6e95 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -225,7 +225,7 @@ public class MailboxSendOperator extends MultiStageOperator 
{
     } catch (Exception e) {
       ErrorMseBlock errorBlock = ErrorMseBlock.fromException(e);
       try {
-        LOGGER.error("Exception while transferring data on opChain: {}", 
_context.getId());
+        LOGGER.error("Exception while transferring data on opChain: {}", 
_context.getId(), e);
         sendEos(errorBlock);
       } catch (Exception e2) {
         LOGGER.error("Exception while sending error block.", e2);
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
index 013aa705331..abd61d2ea81 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
@@ -125,6 +125,7 @@ public abstract class MultiStageOperator
       try {
         nextBlock = getNextBlock();
       } catch (Exception e) {
+        logger().warn("Operator {}: Exception while processing next block", 
_operatorId, e);
         nextBlock = ErrorMseBlock.fromException(e);
       }
       int numRows = nextBlock instanceof MseBlock.Data ? ((MseBlock.Data) 
nextBlock).getNumRows() : 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to