This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b35755402d Short circuit mapper if there are consecutive record fetch 
failures (#17775)
9b35755402d is described below

commit 9b35755402dcecd0dc9a5ce0a7f8688e4a328201
Author: Shounak kulkarni <[email protected]>
AuthorDate: Fri Feb 27 16:52:49 2026 +0530

    Short circuit mapper if there are consecutive record fetch failures (#17775)
---
 .../segment/processing/mapper/SegmentMapper.java   |  44 ++++-
 .../processing/framework/SegmentMapperTest.java    | 217 +++++++++++++++++++++
 .../plugin/inputformat/avro/AvroRecordReader.java  |   9 +-
 .../plugin/inputformat/csv/CSVRecordReader.java    |  12 +-
 .../inputformat/csv/CSVRecordReaderTest.java       |  11 +-
 .../plugin/inputformat/json/JSONRecordReader.java  |  13 +-
 .../plugin/inputformat/orc/ORCRecordReader.java    |   9 +-
 .../parquet/ParquetAvroRecordReader.java           |   9 +-
 .../parquet/ParquetNativeRecordReader.java         |   9 +-
 .../inputformat/protobuf/ProtoBufRecordReader.java |   5 +-
 .../inputformat/thrift/ThriftRecordReader.java     |   3 +-
 .../config/table/ingestion/IngestionConfig.java    |  11 ++
 .../spi/data/readers/RecordFetchException.java     |  49 +++++
 .../pinot/spi/data/readers/RecordReader.java       |  39 +++-
 14 files changed, 415 insertions(+), 25 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
index 03a25cd964a..2eb73b86bd8 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
@@ -43,9 +43,11 @@ import 
org.apache.pinot.core.segment.processing.utils.SegmentProcessorUtils;
 import org.apache.pinot.segment.local.recordtransformer.RecordTransformerUtils;
 import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordFetchException;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
 import org.apache.pinot.spi.recordtransformer.RecordTransformer;
@@ -177,16 +179,23 @@ public class SegmentMapper {
   }
 
 
-//   Returns true if the map phase can continue, false if it should terminate 
based on the configured threshold for
-//   intermediate file size during map phase.
+  // Returns true if the map phase can continue, false if it should terminate 
based on the configured threshold for
+  // intermediate file size during map phase.
   protected boolean completeMapAndTransformRow(RecordReader recordReader, 
GenericRow reuse, Consumer<Object> observer,
       int count, int totalCount) {
     observer.accept(String.format("Doing map phase on data from RecordReader 
(%d out of %d)", count, totalCount));
 
-    boolean continueOnError =
-        _processorConfig.getTableConfig().getIngestionConfig() != null && 
_processorConfig.getTableConfig()
-            .getIngestionConfig().isContinueOnError();
+    boolean continueOnError = false;
+    // skip short-circuiting if maxConsecutiveRecordFetchFailuresAllowed is 
not explicitly set
+    int maxConsecutiveRecordFetchFailuresAllowed = 0;
+    IngestionConfig ingestionConfig = 
_processorConfig.getTableConfig().getIngestionConfig();
+    if (ingestionConfig != null) {
+      continueOnError = ingestionConfig.isContinueOnError();
+      maxConsecutiveRecordFetchFailuresAllowed =
+          ingestionConfig.getMaxConsecutiveRecordFetchFailuresAllowed();
+    }
 
+    int consecutiveFetchFailures = 0;
     while (recordReader.hasNext() && (_adaptiveSizeBasedWriter.canWrite())) {
       try {
         reuse = recordReader.next(reuse);
@@ -197,19 +206,38 @@ public class SegmentMapper {
         _incompleteRowsFound += result.getIncompleteRowCount();
         _skippedRowsFound += result.getSkippedRowCount();
         _sanitizedRowsFound += result.getSanitizedRowCount();
+        // Reset consecutive fetch failures counter on successful read
+        consecutiveFetchFailures = 0;
       } catch (Exception e) {
-        String logMessage = "Caught exception while reading data.";
+        boolean isFetchError = e instanceof RecordFetchException;
+        String errorType = isFetchError ? "fetch" : "parse";
+        String logMessage = String.format("Caught %s exception while reading 
data.", errorType);
         observer.accept(new MinionTaskBaseObserverStats.StatusEntry.Builder()
             .withLevel(MinionTaskBaseObserverStats.StatusEntry.LogLevel.ERROR)
             .withStatus(logMessage + " Reason: " + e.getMessage())
             .build());
         if (!continueOnError) {
           throw new RuntimeException(logMessage, e);
+        } else if (isFetchError && maxConsecutiveRecordFetchFailuresAllowed > 
0) {
+          consecutiveFetchFailures++;
+          LOGGER.debug("{} Consecutive fetch failures: {}", 
consecutiveFetchFailures, logMessage, e);
+          if (consecutiveFetchFailures >= 
maxConsecutiveRecordFetchFailuresAllowed) {
+            String warningMessage = String.format(
+                "Stopping at record reader %d out of %d due to %d consecutive 
fetch failures. "
+                    + "This may indicate the reader is stuck and unable to 
advance. Last error: %s",
+                count, totalCount, consecutiveFetchFailures, e.getMessage());
+            observer.accept(new 
MinionTaskBaseObserverStats.StatusEntry.Builder()
+                
.withLevel(MinionTaskBaseObserverStats.StatusEntry.LogLevel.WARN)
+                .withStatus(warningMessage)
+                .build());
+            LOGGER.warn(warningMessage, e);
+            throw new RuntimeException(warningMessage, e);
+          }
         } else {
+          // Parse error - log and continue without counting toward threshold
           _throttledLogger.warn(logMessage + "Processing RecordReader " + 
count + " out of " + totalCount, e);
-          _incompleteRowsFound++;
-          continue;
         }
+        _incompleteRowsFound++;
       }
       reuse.clear();
     }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
index db040232f98..d1eb2f2d964 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
@@ -19,13 +19,16 @@
 package org.apache.pinot.core.segment.processing.framework;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
+import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
 import 
org.apache.pinot.core.segment.processing.genericrow.GenericRowFileManager;
 import 
org.apache.pinot.core.segment.processing.genericrow.GenericRowFileReader;
@@ -44,10 +47,13 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.TimestampConfig;
 import org.apache.pinot.spi.config.table.TimestampIndexGranularity;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordFetchException;
 import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderConfig;
 import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.annotations.AfterClass;
@@ -58,6 +64,7 @@ import org.testng.annotations.Test;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
 
 /**
@@ -312,6 +319,216 @@ public class SegmentMapperTest {
     return inputs.toArray(new Object[0][]);
   }
 
+  /**
+   * RecordReader that throws fetch or parse errors at configured positions.
+   * Used to test continueOnError and consecutiveFetchFailures short-circuit 
logic.
+   */
+  private static final class FailingRecordReader implements RecordReader {
+    enum Action {
+      SUCCESS,
+      FETCH_FAIL,
+      PARSE_FAIL
+    }
+
+    private final List<GenericRow> _rows;
+    private final List<Action> _actions;
+    /**
+     * If false, on FETCH_FAIL we do not advance the index (simulates stuck 
reader).
+     * If true, we advance so the next call can succeed (intermittent 
failures).
+     */
+    private final boolean _advanceOnFetchFail;
+    private int _index;
+
+    FailingRecordReader(List<GenericRow> rows, List<Action> actions, boolean 
advanceOnFetchFail) {
+      _rows = rows;
+      _actions = actions;
+      _advanceOnFetchFail = advanceOnFetchFail;
+      _index = 0;
+    }
+
+    @Override
+    public void init(File dataFile, @Nullable Set<String> fieldsToRead, 
@Nullable RecordReaderConfig recordReaderConfig)
+        throws IOException {
+      // No-op for test; state is set via constructor.
+    }
+
+    @Override
+    public boolean hasNext() {
+      return _index < _rows.size();
+    }
+
+    @Override
+    public GenericRow next(GenericRow reuse)
+        throws IOException {
+      if (_index >= _rows.size()) {
+        throw new IllegalStateException("No more records");
+      }
+      Action action = _actions.get(_index);
+      switch (action) {
+        case SUCCESS:
+          reuse.init(_rows.get(_index));
+          _index++;
+          return reuse;
+        case FETCH_FAIL:
+          if (_advanceOnFetchFail) {
+            _index++;
+          }
+          throw new RecordFetchException("Simulated fetch failure at index " + 
_index);
+        case PARSE_FAIL:
+          _index++;
+          throw new IOException("Simulated parse failure");
+        default:
+          throw new IllegalStateException("Unknown action: " + action);
+      }
+    }
+
+    @Override
+    public void rewind()
+        throws IOException {
+      _index = 0;
+    }
+
+    @Override
+    public void close() {
+      // No-op.
+    }
+  }
+
+  /**
+   * Builds a TableConfig with continueOnError=true and optional max 
consecutive fetch failures.
+   */
+  private static TableConfig getTableConfigWithContinueOnError(int 
maxConsecutiveRecordFetchFailuresAllowed) {
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setContinueOnError(true);
+    
ingestionConfig.setMaxConsecutiveRecordFetchFailuresAllowed(maxConsecutiveRecordFetchFailuresAllowed);
+    return TABLE_CONFIG_BUILDER.setIngestionConfig(ingestionConfig).build();
+  }
+
+  /**
+   * Creates a list of GenericRows suitable for the test schema (campaign, 
clicks, ts).
+   */
+  private static List<GenericRow> rows(Object[]... raw) {
+    List<GenericRow> list = new ArrayList<>();
+    for (Object[] r : raw) {
+      GenericRow row = new GenericRow();
+      row.putValue("campaign", r[0]);
+      row.putValue("clicks", r[1]);
+      row.putValue("ts", r[2]);
+      list.add(row);
+    }
+    return list;
+  }
+
+  @Test
+  public void testContinueOnErrorIntermittentFetchFailures()
+      throws Exception {
+    // Intermittent fetch failures: fail then succeed repeatedly. Counter 
resets on success, so we never hit threshold.
+    List<GenericRow> data = rows(
+        new Object[]{"x", 0, 1597719600000L},
+        new Object[]{"a", 1, 1597719600000L},
+        new Object[]{"x", 0, 1597773600000L},
+        new Object[]{"b", 2, 1597773600000L},
+        new Object[]{"x", 0, 1597777200000L},
+        new Object[]{"c", 3, 1597777200000L});
+    List<FailingRecordReader.Action> actions = Arrays.asList(
+        FailingRecordReader.Action.FETCH_FAIL,
+        FailingRecordReader.Action.SUCCESS,
+        FailingRecordReader.Action.FETCH_FAIL,
+        FailingRecordReader.Action.SUCCESS,
+        FailingRecordReader.Action.FETCH_FAIL,
+        FailingRecordReader.Action.SUCCESS);
+    FailingRecordReader reader = new FailingRecordReader(data, actions, true);
+
+    TableConfig tableConfig = getTableConfigWithContinueOnError(3);
+    SegmentProcessorConfig processorConfig =
+        new 
SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(getSchema()).build();
+    File mapperOutputDir = new File(TEMP_DIR, "mapper_output_intermittent");
+    FileUtils.deleteQuietly(mapperOutputDir);
+    assertTrue(mapperOutputDir.mkdirs());
+
+    SegmentMapper segmentMapper =
+        new SegmentMapper(Collections.singletonList(new 
RecordReaderFileConfig(reader)),
+            Collections.emptyList(), processorConfig, mapperOutputDir);
+    Map<String, GenericRowFileManager> result = segmentMapper.map();
+
+    // Should complete successfully with 3 rows written (successes only).
+    assertEquals(result.size(), 1);
+    GenericRowFileManager fileManager = result.get("0");
+    assertNotNull(fileManager);
+    assertEquals(fileManager.getFileReader().getNumRows(), 3);
+    fileManager.cleanUp();
+  }
+
+  @Test
+  public void testContinueOnErrorConsistentFetchFailures() {
+    // Consistent fetch failures: reader never advances. After 
maxConsecutiveRecordFetchFailuresAllowed we short-circuit
+    List<GenericRow> data = rows(new Object[]{"a", 1, 1597719600000L});
+    List<FailingRecordReader.Action> actions = 
Collections.singletonList(FailingRecordReader.Action.FETCH_FAIL);
+    FailingRecordReader reader = new FailingRecordReader(data, actions, false);
+
+    TableConfig tableConfig = getTableConfigWithContinueOnError(3);
+    SegmentProcessorConfig processorConfig =
+        new 
SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(getSchema()).build();
+    File mapperOutputDir = new File(TEMP_DIR, "mapper_output_consistent");
+    FileUtils.deleteQuietly(mapperOutputDir);
+    assertTrue(mapperOutputDir.mkdirs());
+
+    SegmentMapper segmentMapper =
+        new SegmentMapper(Collections.singletonList(new 
RecordReaderFileConfig(reader)),
+            Collections.emptyList(), processorConfig, mapperOutputDir);
+
+    try {
+      segmentMapper.map();
+      fail("Expected RuntimeException due to consecutive fetch failures");
+    } catch (Exception e) {
+      assertTrue(e.getMessage().contains("consecutive fetch failures"),
+          "Expected message about consecutive fetch failures: " + 
e.getMessage());
+      assertTrue(e.getMessage().contains("Stopping at record reader"),
+          "Expected message about stopping record reader: " + e.getMessage());
+    }
+  }
+
+  @Test
+  public void testContinueOnErrorParseFailures()
+      throws Exception {
+    // Parse failures do not count toward consecutive fetch threshold; mapper 
continues and counts incomplete rows.
+    List<GenericRow> data = rows(
+        new Object[]{"x", 0, 1597719600000L},
+        new Object[]{"a", 1, 1597719600000L},
+        new Object[]{"x", 0, 1597773600000L},
+        new Object[]{"b", 2, 1597773600000L},
+        new Object[]{"x", 0, 1597777200000L},
+        new Object[]{"c", 3, 1597777200000L});
+    List<FailingRecordReader.Action> actions = Arrays.asList(
+        FailingRecordReader.Action.PARSE_FAIL,
+        FailingRecordReader.Action.SUCCESS,
+        FailingRecordReader.Action.PARSE_FAIL,
+        FailingRecordReader.Action.SUCCESS,
+        FailingRecordReader.Action.PARSE_FAIL,
+        FailingRecordReader.Action.SUCCESS);
+    FailingRecordReader reader = new FailingRecordReader(data, actions, true);
+
+    TableConfig tableConfig = getTableConfigWithContinueOnError(3);
+    SegmentProcessorConfig processorConfig =
+        new 
SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(getSchema()).build();
+    File mapperOutputDir = new File(TEMP_DIR, "mapper_output_parse");
+    FileUtils.deleteQuietly(mapperOutputDir);
+    assertTrue(mapperOutputDir.mkdirs());
+
+    SegmentMapper segmentMapper =
+        new SegmentMapper(Collections.singletonList(new 
RecordReaderFileConfig(reader)),
+            Collections.emptyList(), processorConfig, mapperOutputDir);
+    Map<String, GenericRowFileManager> result = segmentMapper.map();
+
+    // Should complete; 3 rows written, 3 incomplete (parse failures).
+    assertEquals(result.size(), 1);
+    GenericRowFileManager fileManager = result.get("0");
+    assertNotNull(fileManager);
+    assertEquals(fileManager.getFileReader().getNumRows(), 3);
+    assertEquals(segmentMapper.getIncompleteRowsFound(), 3);
+    fileManager.cleanUp();
+  }
+
   @AfterClass
   public void tearDown() {
     FileUtils.deleteQuietly(TEMP_DIR);
diff --git 
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordReader.java
 
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordReader.java
index dd9ab6d3758..729849d04be 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordReader.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordReader.java
@@ -25,6 +25,7 @@ import javax.annotation.Nullable;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordFetchException;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderConfig;
 
@@ -64,7 +65,13 @@ public class AvroRecordReader implements RecordReader {
   @Override
   public GenericRow next(GenericRow reuse)
       throws IOException {
-    _reusableAvroRecord = _avroReader.next(_reusableAvroRecord);
+    // Record fetch: read next Avro record from stream.
+    try {
+      _reusableAvroRecord = _avroReader.next(_reusableAvroRecord);
+    } catch (IOException e) {
+      throw new RecordFetchException("Failed to read next Avro record", e);
+    }
+    // Data parsing: extract into GenericRow.
     _recordExtractor.extract(_reusableAvroRecord, reuse);
     return reuse;
   }
diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
index 68958480f95..42c2ab24068 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
@@ -35,6 +35,7 @@ import org.apache.commons.csv.CSVRecord;
 import org.apache.commons.csv.QuoteMode;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordFetchException;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderConfig;
 import org.apache.pinot.spi.data.readers.RecordReaderUtils;
@@ -217,10 +218,17 @@ public class CSVRecordReader implements RecordReader {
       LOGGER.warn("Caught exception while reading CSV file: {}, recovering 
from line: {}", _dataFile, _numSkippedLines,
           exception);
 
-      throw exception;
+      throw new RecordFetchException("Failed to read CSV record", exception);
     }
 
-    CSVRecord record = _iterator.next();
+    // Record fetch: read next CSV record from stream.
+    CSVRecord record;
+    try {
+      record = _iterator.next();
+    } catch (RuntimeException e) {
+      throw new RecordFetchException("Failed to read next CSV record", e);
+    }
+    // Data parsing: extract into GenericRow.
     _recordExtractor.extract(record, reuse);
     _nextLineId = _numSkippedLines + (int) _parser.getCurrentLineNumber();
     return reuse;
diff --git 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
index 12303657b03..daba502b285 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
@@ -21,7 +21,6 @@ package org.apache.pinot.plugin.inputformat.csv;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
-import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -435,7 +434,7 @@ public class CSVRecordReaderTest extends 
AbstractRecordReaderTest {
     assertEquals(recordReader.next().getFieldToValueMap(), createMap("id", 
"100", "name", "John"));
     // Second line is unparseable, should throw exception when next() is 
called, and being skipped
     assertTrue(recordReader.hasNext());
-    assertThrows(UncheckedIOException.class, recordReader::next);
+    assertThrows(IOException.class, recordReader::next);
     // Third line is parseable
     assertTrue(recordReader.hasNext());
     assertEquals(recordReader.next().getFieldToValueMap(), createMap("id", 
"102", "name", "Alice"));
@@ -482,7 +481,7 @@ public class CSVRecordReaderTest extends 
AbstractRecordReaderTest {
     assertEquals(recordReader.next().getFieldToValueMap(), createMap("id", 
"100", "name", "John"));
     // Second line is unparseable, should throw exception when next() is 
called, and being skipped
     assertTrue(recordReader.hasNext());
-    assertThrows(UncheckedIOException.class, recordReader::next);
+    assertThrows(IOException.class, recordReader::next);
     // 2 lines in total
     assertFalse(recordReader.hasNext());
   }
@@ -528,7 +527,7 @@ public class CSVRecordReaderTest extends 
AbstractRecordReaderTest {
     assertEquals(recordReader.next().getFieldToValueMap(),
         createMap("id", "101", "firstName", "john", "lastName", "doe", 
"appVersion", "1.0.1", "active", "yes"));
     assertTrue(recordReader.hasNext());
-    assertThrows(UncheckedIOException.class, recordReader::next);
+    assertThrows(IOException.class, recordReader::next);
     assertFalse(recordReader.hasNext());
   }
 
@@ -578,9 +577,9 @@ public class CSVRecordReaderTest extends 
AbstractRecordReaderTest {
     assertEquals(recordReader.next().getFieldToValueMap(), createMap("id", 
"103", "name", "Suzanne"));
     // NOTE: Here we need to skip twice because the first line is a comment 
line
     assertTrue(recordReader.hasNext());
-    assertThrows(UncheckedIOException.class, recordReader::next);
+    assertThrows(IOException.class, recordReader::next);
     assertTrue(recordReader.hasNext());
-    assertThrows(UncheckedIOException.class, recordReader::next);
+    assertThrows(IOException.class, recordReader::next);
     assertTrue(recordReader.hasNext());
     assertEquals(recordReader.next().getFieldToValueMap(), createMap("id", 
"105", "name", "Zack\nZack"));
     assertTrue(recordReader.hasNext());
diff --git 
a/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordReader.java
 
b/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordReader.java
index 6c739fd501b..14a30d93ba3 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordReader.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordReader.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordFetchException;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderConfig;
 import org.apache.pinot.spi.data.readers.RecordReaderUtils;
@@ -78,8 +79,16 @@ public class JSONRecordReader implements RecordReader {
   }
 
   @Override
-  public GenericRow next(GenericRow reuse) {
-    Map<String, Object> record = _iterator.next();
+  public GenericRow next(GenericRow reuse)
+      throws IOException {
+    // Record fetch: read next JSON record from stream.
+    Map<String, Object> record;
+    try {
+      record = _iterator.next();
+    } catch (Exception e) {
+      throw new RecordFetchException("Failed to read next JSON record", e);
+    }
+    // Data parsing: extract into GenericRow.
     _recordExtractor.extract(record, reuse);
     return reuse;
   }
diff --git 
a/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java
 
b/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java
index ef5d2a934b9..52f891cc604 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java
@@ -44,6 +44,7 @@ import org.apache.orc.OrcFile;
 import org.apache.orc.Reader;
 import org.apache.orc.TypeDescription;
 import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordFetchException;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderConfig;
 import org.apache.pinot.spi.data.readers.RecordReaderUtils;
@@ -180,6 +181,7 @@ public class ORCRecordReader implements RecordReader {
   @Override
   public GenericRow next(GenericRow reuse)
       throws IOException {
+    // Data parsing: extract current batch row into GenericRow.
     int numFields = _orcFields.size();
     for (int i = 0; i < numFields; i++) {
       if (!_includeOrcFields[i]) {
@@ -190,8 +192,13 @@ public class ORCRecordReader implements RecordReader {
       reuse.putValue(field, extractValue(field, _rowBatch.cols[i], fieldType, 
_nextRowId));
     }
 
+    // Record fetch: read next batch when current one is exhausted.
     if (_nextRowId == _rowBatch.size - 1) {
-      _hasNext = _orcRecordReader.nextBatch(_rowBatch);
+      try {
+        _hasNext = _orcRecordReader.nextBatch(_rowBatch);
+      } catch (IOException e) {
+        throw new RecordFetchException("Failed to read next ORC record", e);
+      }
       _nextRowId = 0;
     } else {
       _nextRowId++;
diff --git 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java
 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java
index 9ff0c4e61be..80906a34d92 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java
@@ -26,6 +26,7 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.hadoop.ParquetReader;
 import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordFetchException;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderConfig;
 import org.apache.pinot.spi.data.readers.RecordReaderUtils;
@@ -65,8 +66,14 @@ public class ParquetAvroRecordReader implements RecordReader 
{
   @Override
   public GenericRow next(GenericRow reuse)
       throws IOException {
+    // Data parsing: extract current record into GenericRow.
     _recordExtractor.extract(_nextRecord, reuse);
-    _nextRecord = _parquetReader.read();
+    // Record fetch: read next Parquet Avro record.
+    try {
+      _nextRecord = _parquetReader.read();
+    } catch (IOException e) {
+      throw new RecordFetchException("Failed to read next Parquet Avro 
record", e);
+    }
     return reuse;
   }
 
diff --git 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java
 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java
index ebb585bfe1b..a797643c5b1 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java
@@ -35,6 +35,7 @@ import org.apache.parquet.io.ColumnIOFactory;
 import org.apache.parquet.io.MessageColumnIO;
 import org.apache.parquet.schema.MessageType;
 import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordFetchException;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderConfig;
 import org.apache.pinot.spi.data.readers.RecordReaderUtils;
@@ -112,7 +113,13 @@ public class ParquetNativeRecordReader implements 
RecordReader {
   @Override
   public GenericRow next(GenericRow reuse)
       throws IOException {
-    _nextRecord = _parquetRecordReader.read();
+    // Record fetch: read next record from Parquet page.
+    try {
+      _nextRecord = _parquetRecordReader.read();
+    } catch (Exception e) {
+      throw new RecordFetchException("Failed to read next Parquet native 
record", e);
+    }
+    // Data parsing: extract into GenericRow.
     _recordExtractor.extract(_nextRecord, reuse);
     _currentPageIdx++;
     return reuse;
diff --git 
a/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordReader.java
 
b/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordReader.java
index 90485d1090b..edd76987b7b 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordReader.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordReader.java
@@ -29,6 +29,7 @@ import java.io.InputStream;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordFetchException;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderConfig;
 import org.apache.pinot.spi.data.readers.RecordReaderUtils;
@@ -99,14 +100,16 @@ public class ProtoBufRecordReader implements RecordReader {
   public GenericRow next(GenericRow reuse)
       throws IOException {
     Message message;
+    // Record fetch: read next protobuf message from stream.
     try {
       _builder.mergeDelimitedFrom(_inputStream);
       message = _builder.build();
     } catch (Exception e) {
-      throw new IOException("Caught exception while reading protobuf object", 
e);
+      throw new RecordFetchException("Caught exception while reading protobuf 
object", e);
     } finally {
       _builder.clear();
     }
+    // Data parsing: extract into GenericRow.
     _recordExtractor.extract(message, reuse);
     _hasNext = hasMoreToRead();
     return reuse;
diff --git 
a/pinot-plugins/pinot-input-format/pinot-thrift/src/main/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordReader.java
 
b/pinot-plugins/pinot-input-format/pinot-thrift/src/main/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordReader.java
index 1d8de8fa698..d28c21f56d7 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-thrift/src/main/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordReader.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-thrift/src/main/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordReader.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordFetchException;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderConfig;
 import org.apache.pinot.spi.data.readers.RecordReaderUtils;
@@ -110,7 +111,7 @@ public class ThriftRecordReader implements RecordReader {
       tObject = (TBase) _thriftClass.newInstance();
       tObject.read(_tProtocol);
     } catch (Exception e) {
-      throw new IOException("Caught exception while reading thrift object", e);
+      throw new RecordFetchException("Caught exception while reading thrift 
object", e);
     }
     _recordExtractor.extract(tObject, reuse);
     _hasNext = hasMoreToRead();
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
index e44d0b89246..c126c9b0487 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
@@ -67,6 +67,9 @@ public class IngestionConfig extends BaseJsonConfig {
   @JsonPropertyDescription("Configs related to skip any row which has error 
and continue during ingestion")
   private boolean _continueOnError;
 
+  @JsonPropertyDescription("Max consecutive failures allowed while fetching 
record from source.")
+  private int _maxConsecutiveRecordFetchFailuresAllowed;
+
   @JsonPropertyDescription(
       "Configs related to retry segment build on reduced size when previous 
build fails on Preconditions check")
   private boolean _retryOnSegmentBuildPrecheckFailure;
@@ -161,6 +164,10 @@ public class IngestionConfig extends BaseJsonConfig {
     return _ingestionExceptionLogRateLimitPerMin;
   }
 
+  public int getMaxConsecutiveRecordFetchFailuresAllowed() {
+    return _maxConsecutiveRecordFetchFailuresAllowed;
+  }
+
   public void setBatchIngestionConfig(BatchIngestionConfig 
batchIngestionConfig) {
     _batchIngestionConfig = batchIngestionConfig;
   }
@@ -213,4 +220,8 @@ public class IngestionConfig extends BaseJsonConfig {
   public void setIngestionExceptionLogRateLimitPerMin(int 
ingestionExceptionLogRateLimitPerMin) {
     _ingestionExceptionLogRateLimitPerMin = 
ingestionExceptionLogRateLimitPerMin;
   }
+
+  public void setMaxConsecutiveRecordFetchFailuresAllowed(int 
maxConsecutiveRecordFetchFailuresAllowed) {
+    _maxConsecutiveRecordFetchFailuresAllowed = 
maxConsecutiveRecordFetchFailuresAllowed;
+  }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordFetchException.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordFetchException.java
new file mode 100644
index 00000000000..4c5371de127
--- /dev/null
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordFetchException.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data.readers;
+
+import java.io.IOException;
+
+
+/**
+ * Exception indicating an I/O or data fetch error that may prevent the 
RecordReader
+ * from advancing its pointer to the next record. This exception should be 
thrown when
+ * the RecordReader encounters an error during data fetching that prevents it 
from
+ * moving forward, potentially causing an infinite loop if the same record is 
repeatedly
+ * attempted.
+ * <p>
+ * When this exception is thrown and continueOnError is enabled, it will count 
toward
+ * the consecutive failure threshold to prevent infinite loops.
+ */
+public class RecordFetchException extends IOException {
+
+  private static final long serialVersionUID = 1L;
+
+  public RecordFetchException(String message) {
+    super(message);
+  }
+
+  public RecordFetchException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public RecordFetchException(Throwable cause) {
+    super(cause);
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReader.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReader.java
index db3a455da34..5b0a57f39d4 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReader.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReader.java
@@ -30,6 +30,17 @@ import javax.annotation.Nullable;
  * The <code>RecordReader</code> interface is used to read records from 
various file formats into {@link GenericRow}s.
  * Pinot segments will be generated from {@link GenericRow}s.
  * <p>NOTE: for time column, record reader should be able to read both 
incoming and outgoing time
+ * <p>
+ * <h2>Exception Handling</h2>
+ * <p>When implementing {@link #next(GenericRow)}, implementations should use 
appropriate exception types to indicate
+ * the nature of the error:
+ * <ul>
+ *   <li><b>{@link RecordFetchException}</b>: Throw when encountering I/O or 
data fetch errors that may prevent
+ *       the reader from advancing its pointer to the next record. These 
errors count toward the consecutive failure
+ *       threshold to prevent infinite loops.</li>
+ *   <li><b>{@link IOException}</b>: For backward compatibility, IOException 
is treated as a parse error and can be
+ *       skipped indefinitely.</li>
+ * </ul>
  */
 public interface RecordReader extends Closeable, Serializable {
 
@@ -54,6 +65,18 @@ public interface RecordReader extends Closeable, 
Serializable {
    * Get the next record.
    * <p>This method should be called only if {@link #hasNext()} returns 
<code>true</code>. Caller is responsible for
    * handling exceptions from this method and skip the row if user wants to 
continue reading the remaining rows.
+   * <p>
+   * <b>Exception Guidelines:</b>
+   * <ul>
+   *   <li>Throw {@link RecordFetchException} if the reader cannot advance due 
to I/O or fetch errors</li>
+   *   <li>Throw {@link IOException} if data is corrupt but the reader can 
advance to the next record</li>
+   * </ul>
+   * <b>Important:</b> If an exception is thrown, the implementation should 
ensure that either:
+   * <ul>
+   *   <li>The reader's pointer advances to the next record (for parse 
errors), OR</li>
+   *   <li>The reader throws RecordFetchException to indicate it cannot 
advance (for fetch errors)</li>
+   * </ul>
+   * This prevents infinite loops when the same record is repeatedly attempted.
    */
   default GenericRow next()
       throws IOException {
@@ -65,7 +88,21 @@ public interface RecordReader extends Closeable, 
Serializable {
    * <p>The passed in row should be cleared before calling this method.
    * <p>This method should be called only if {@link #hasNext()} returns 
<code>true</code>. Caller is responsible for
    * handling exceptions from this method and skip the row if user wants to 
continue reading the remaining rows.
-   *
+   * <p>
+   * <b>Exception Guidelines:</b>
+   * <ul>
+   *   <li>Throw {@link RecordFetchException} if the reader cannot advance due 
to I/O or fetch errors (e.g., network
+   *       failure, file system error, stream corruption that prevents 
reading). These errors count toward the
+   *       consecutive failure threshold.</li>
+   *   <li>Throw {@link IOException} if data is corrupt or malformed but the 
reader can advance to the next
+   *       record (e.g., invalid format, type conversion failure, schema 
mismatch).</li>
+   * </ul>
+   * <b>Important:</b> If an exception is thrown, the implementation should 
ensure that either:
+   * <ul>
+   *   <li>The reader's pointer advances to the next record (for parse 
errors), OR</li>
+   *   <li>The reader throws RecordFetchException to indicate it cannot 
advance (for fetch errors)</li>
+   * </ul>
+   * This prevents infinite loops when the same record is repeatedly 
attempted.<br>
    * TODO: Consider clearing the row within the record reader to simplify the 
caller
    */
   GenericRow next(GenericRow reuse)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to