This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9b35755402d Short circuit mapper if there are consecutive record fetch
failures (#17775)
9b35755402d is described below
commit 9b35755402dcecd0dc9a5ce0a7f8688e4a328201
Author: Shounak kulkarni <[email protected]>
AuthorDate: Fri Feb 27 16:52:49 2026 +0530
Short circuit mapper if there are consecutive record fetch failures (#17775)
---
.../segment/processing/mapper/SegmentMapper.java | 44 ++++-
.../processing/framework/SegmentMapperTest.java | 217 +++++++++++++++++++++
.../plugin/inputformat/avro/AvroRecordReader.java | 9 +-
.../plugin/inputformat/csv/CSVRecordReader.java | 12 +-
.../inputformat/csv/CSVRecordReaderTest.java | 11 +-
.../plugin/inputformat/json/JSONRecordReader.java | 13 +-
.../plugin/inputformat/orc/ORCRecordReader.java | 9 +-
.../parquet/ParquetAvroRecordReader.java | 9 +-
.../parquet/ParquetNativeRecordReader.java | 9 +-
.../inputformat/protobuf/ProtoBufRecordReader.java | 5 +-
.../inputformat/thrift/ThriftRecordReader.java | 3 +-
.../config/table/ingestion/IngestionConfig.java | 11 ++
.../spi/data/readers/RecordFetchException.java | 49 +++++
.../pinot/spi/data/readers/RecordReader.java | 39 +++-
14 files changed, 415 insertions(+), 25 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
index 03a25cd964a..2eb73b86bd8 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
@@ -43,9 +43,11 @@ import
org.apache.pinot.core.segment.processing.utils.SegmentProcessorUtils;
import org.apache.pinot.segment.local.recordtransformer.RecordTransformerUtils;
import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordFetchException;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
import org.apache.pinot.spi.recordtransformer.RecordTransformer;
@@ -177,16 +179,23 @@ public class SegmentMapper {
}
-// Returns true if the map phase can continue, false if it should terminate
based on the configured threshold for
-// intermediate file size during map phase.
+ // Returns true if the map phase can continue, false if it should terminate
based on the configured threshold for
+ // intermediate file size during map phase.
protected boolean completeMapAndTransformRow(RecordReader recordReader,
GenericRow reuse, Consumer<Object> observer,
int count, int totalCount) {
observer.accept(String.format("Doing map phase on data from RecordReader
(%d out of %d)", count, totalCount));
- boolean continueOnError =
- _processorConfig.getTableConfig().getIngestionConfig() != null &&
_processorConfig.getTableConfig()
- .getIngestionConfig().isContinueOnError();
+ boolean continueOnError = false;
+ // skip short-circuiting if maxConsecutiveRecordFetchFailuresAllowed is
not explicitly set
+ int maxConsecutiveRecordFetchFailuresAllowed = 0;
+ IngestionConfig ingestionConfig =
_processorConfig.getTableConfig().getIngestionConfig();
+ if (ingestionConfig != null) {
+ continueOnError = ingestionConfig.isContinueOnError();
+ maxConsecutiveRecordFetchFailuresAllowed =
+ ingestionConfig.getMaxConsecutiveRecordFetchFailuresAllowed();
+ }
+ int consecutiveFetchFailures = 0;
while (recordReader.hasNext() && (_adaptiveSizeBasedWriter.canWrite())) {
try {
reuse = recordReader.next(reuse);
@@ -197,19 +206,38 @@ public class SegmentMapper {
_incompleteRowsFound += result.getIncompleteRowCount();
_skippedRowsFound += result.getSkippedRowCount();
_sanitizedRowsFound += result.getSanitizedRowCount();
+ // Reset consecutive fetch failures counter on successful read
+ consecutiveFetchFailures = 0;
} catch (Exception e) {
- String logMessage = "Caught exception while reading data.";
+ boolean isFetchError = e instanceof RecordFetchException;
+ String errorType = isFetchError ? "fetch" : "parse";
+ String logMessage = String.format("Caught %s exception while reading
data.", errorType);
observer.accept(new MinionTaskBaseObserverStats.StatusEntry.Builder()
.withLevel(MinionTaskBaseObserverStats.StatusEntry.LogLevel.ERROR)
.withStatus(logMessage + " Reason: " + e.getMessage())
.build());
if (!continueOnError) {
throw new RuntimeException(logMessage, e);
+ } else if (isFetchError && maxConsecutiveRecordFetchFailuresAllowed >
0) {
+ consecutiveFetchFailures++;
+ LOGGER.debug("{} Consecutive fetch failures: {}",
consecutiveFetchFailures, logMessage, e);
+ if (consecutiveFetchFailures >=
maxConsecutiveRecordFetchFailuresAllowed) {
+ String warningMessage = String.format(
+ "Stopping at record reader %d out of %d due to %d consecutive
fetch failures. "
+ + "This may indicate the reader is stuck and unable to
advance. Last error: %s",
+ count, totalCount, consecutiveFetchFailures, e.getMessage());
+ observer.accept(new
MinionTaskBaseObserverStats.StatusEntry.Builder()
+
.withLevel(MinionTaskBaseObserverStats.StatusEntry.LogLevel.WARN)
+ .withStatus(warningMessage)
+ .build());
+ LOGGER.warn(warningMessage, e);
+ throw new RuntimeException(warningMessage, e);
+ }
} else {
+ // Parse error - log and continue without counting toward threshold
_throttledLogger.warn(logMessage + "Processing RecordReader " +
count + " out of " + totalCount, e);
- _incompleteRowsFound++;
- continue;
}
+ _incompleteRowsFound++;
}
reuse.clear();
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
index db040232f98..d1eb2f2d964 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentMapperTest.java
@@ -19,13 +19,16 @@
package org.apache.pinot.core.segment.processing.framework;
import java.io.File;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
+import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import
org.apache.pinot.core.segment.processing.genericrow.GenericRowFileManager;
import
org.apache.pinot.core.segment.processing.genericrow.GenericRowFileReader;
@@ -44,10 +47,13 @@ import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.TimestampConfig;
import org.apache.pinot.spi.config.table.TimestampIndexGranularity;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordFetchException;
import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderConfig;
import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.AfterClass;
@@ -58,6 +64,7 @@ import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
/**
@@ -312,6 +319,216 @@ public class SegmentMapperTest {
return inputs.toArray(new Object[0][]);
}
+ /**
+ * RecordReader that throws fetch or parse errors at configured positions.
+ * Used to test continueOnError and consecutiveFetchFailures short-circuit
logic.
+ */
+ private static final class FailingRecordReader implements RecordReader {
+ enum Action {
+ SUCCESS,
+ FETCH_FAIL,
+ PARSE_FAIL
+ }
+
+ private final List<GenericRow> _rows;
+ private final List<Action> _actions;
+ /**
+ * If false, on FETCH_FAIL we do not advance the index (simulates stuck
reader).
+ * If true, we advance so the next call can succeed (intermittent
failures).
+ */
+ private final boolean _advanceOnFetchFail;
+ private int _index;
+
+ FailingRecordReader(List<GenericRow> rows, List<Action> actions, boolean
advanceOnFetchFail) {
+ _rows = rows;
+ _actions = actions;
+ _advanceOnFetchFail = advanceOnFetchFail;
+ _index = 0;
+ }
+
+ @Override
+ public void init(File dataFile, @Nullable Set<String> fieldsToRead,
@Nullable RecordReaderConfig recordReaderConfig)
+ throws IOException {
+ // No-op for test; state is set via constructor.
+ }
+
+ @Override
+ public boolean hasNext() {
+ return _index < _rows.size();
+ }
+
+ @Override
+ public GenericRow next(GenericRow reuse)
+ throws IOException {
+ if (_index >= _rows.size()) {
+ throw new IllegalStateException("No more records");
+ }
+ Action action = _actions.get(_index);
+ switch (action) {
+ case SUCCESS:
+ reuse.init(_rows.get(_index));
+ _index++;
+ return reuse;
+ case FETCH_FAIL:
+ if (_advanceOnFetchFail) {
+ _index++;
+ }
+ throw new RecordFetchException("Simulated fetch failure at index " +
_index);
+ case PARSE_FAIL:
+ _index++;
+ throw new IOException("Simulated parse failure");
+ default:
+ throw new IllegalStateException("Unknown action: " + action);
+ }
+ }
+
+ @Override
+ public void rewind()
+ throws IOException {
+ _index = 0;
+ }
+
+ @Override
+ public void close() {
+ // No-op.
+ }
+ }
+
+ /**
+ * Builds a TableConfig with continueOnError=true and optional max
consecutive fetch failures.
+ */
+ private static TableConfig getTableConfigWithContinueOnError(int
maxConsecutiveRecordFetchFailuresAllowed) {
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setContinueOnError(true);
+
ingestionConfig.setMaxConsecutiveRecordFetchFailuresAllowed(maxConsecutiveRecordFetchFailuresAllowed);
+ return TABLE_CONFIG_BUILDER.setIngestionConfig(ingestionConfig).build();
+ }
+
+ /**
+ * Creates a list of GenericRows suitable for the test schema (campaign,
clicks, ts).
+ */
+ private static List<GenericRow> rows(Object[]... raw) {
+ List<GenericRow> list = new ArrayList<>();
+ for (Object[] r : raw) {
+ GenericRow row = new GenericRow();
+ row.putValue("campaign", r[0]);
+ row.putValue("clicks", r[1]);
+ row.putValue("ts", r[2]);
+ list.add(row);
+ }
+ return list;
+ }
+
+ @Test
+ public void testContinueOnErrorIntermittentFetchFailures()
+ throws Exception {
+ // Intermittent fetch failures: fail then succeed repeatedly. Counter
resets on success, so we never hit threshold.
+ List<GenericRow> data = rows(
+ new Object[]{"x", 0, 1597719600000L},
+ new Object[]{"a", 1, 1597719600000L},
+ new Object[]{"x", 0, 1597773600000L},
+ new Object[]{"b", 2, 1597773600000L},
+ new Object[]{"x", 0, 1597777200000L},
+ new Object[]{"c", 3, 1597777200000L});
+ List<FailingRecordReader.Action> actions = Arrays.asList(
+ FailingRecordReader.Action.FETCH_FAIL,
+ FailingRecordReader.Action.SUCCESS,
+ FailingRecordReader.Action.FETCH_FAIL,
+ FailingRecordReader.Action.SUCCESS,
+ FailingRecordReader.Action.FETCH_FAIL,
+ FailingRecordReader.Action.SUCCESS);
+ FailingRecordReader reader = new FailingRecordReader(data, actions, true);
+
+ TableConfig tableConfig = getTableConfigWithContinueOnError(3);
+ SegmentProcessorConfig processorConfig =
+ new
SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(getSchema()).build();
+ File mapperOutputDir = new File(TEMP_DIR, "mapper_output_intermittent");
+ FileUtils.deleteQuietly(mapperOutputDir);
+ assertTrue(mapperOutputDir.mkdirs());
+
+ SegmentMapper segmentMapper =
+ new SegmentMapper(Collections.singletonList(new
RecordReaderFileConfig(reader)),
+ Collections.emptyList(), processorConfig, mapperOutputDir);
+ Map<String, GenericRowFileManager> result = segmentMapper.map();
+
+ // Should complete successfully with 3 rows written (successes only).
+ assertEquals(result.size(), 1);
+ GenericRowFileManager fileManager = result.get("0");
+ assertNotNull(fileManager);
+ assertEquals(fileManager.getFileReader().getNumRows(), 3);
+ fileManager.cleanUp();
+ }
+
+ @Test
+ public void testContinueOnErrorConsistentFetchFailures() {
+ // Consistent fetch failures: reader never advances. After
maxConsecutiveRecordFetchFailuresAllowed we short-circuit
+ List<GenericRow> data = rows(new Object[]{"a", 1, 1597719600000L});
+ List<FailingRecordReader.Action> actions =
Collections.singletonList(FailingRecordReader.Action.FETCH_FAIL);
+ FailingRecordReader reader = new FailingRecordReader(data, actions, false);
+
+ TableConfig tableConfig = getTableConfigWithContinueOnError(3);
+ SegmentProcessorConfig processorConfig =
+ new
SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(getSchema()).build();
+ File mapperOutputDir = new File(TEMP_DIR, "mapper_output_consistent");
+ FileUtils.deleteQuietly(mapperOutputDir);
+ assertTrue(mapperOutputDir.mkdirs());
+
+ SegmentMapper segmentMapper =
+ new SegmentMapper(Collections.singletonList(new
RecordReaderFileConfig(reader)),
+ Collections.emptyList(), processorConfig, mapperOutputDir);
+
+ try {
+ segmentMapper.map();
+ fail("Expected RuntimeException due to consecutive fetch failures");
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("consecutive fetch failures"),
+ "Expected message about consecutive fetch failures: " +
e.getMessage());
+ assertTrue(e.getMessage().contains("Stopping at record reader"),
+ "Expected message about stopping record reader: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testContinueOnErrorParseFailures()
+ throws Exception {
+ // Parse failures do not count toward consecutive fetch threshold; mapper
continues and counts incomplete rows.
+ List<GenericRow> data = rows(
+ new Object[]{"x", 0, 1597719600000L},
+ new Object[]{"a", 1, 1597719600000L},
+ new Object[]{"x", 0, 1597773600000L},
+ new Object[]{"b", 2, 1597773600000L},
+ new Object[]{"x", 0, 1597777200000L},
+ new Object[]{"c", 3, 1597777200000L});
+ List<FailingRecordReader.Action> actions = Arrays.asList(
+ FailingRecordReader.Action.PARSE_FAIL,
+ FailingRecordReader.Action.SUCCESS,
+ FailingRecordReader.Action.PARSE_FAIL,
+ FailingRecordReader.Action.SUCCESS,
+ FailingRecordReader.Action.PARSE_FAIL,
+ FailingRecordReader.Action.SUCCESS);
+ FailingRecordReader reader = new FailingRecordReader(data, actions, true);
+
+ TableConfig tableConfig = getTableConfigWithContinueOnError(3);
+ SegmentProcessorConfig processorConfig =
+ new
SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(getSchema()).build();
+ File mapperOutputDir = new File(TEMP_DIR, "mapper_output_parse");
+ FileUtils.deleteQuietly(mapperOutputDir);
+ assertTrue(mapperOutputDir.mkdirs());
+
+ SegmentMapper segmentMapper =
+ new SegmentMapper(Collections.singletonList(new
RecordReaderFileConfig(reader)),
+ Collections.emptyList(), processorConfig, mapperOutputDir);
+ Map<String, GenericRowFileManager> result = segmentMapper.map();
+
+ // Should complete; 3 rows written, 3 incomplete (parse failures).
+ assertEquals(result.size(), 1);
+ GenericRowFileManager fileManager = result.get("0");
+ assertNotNull(fileManager);
+ assertEquals(fileManager.getFileReader().getNumRows(), 3);
+ assertEquals(segmentMapper.getIncompleteRowsFound(), 3);
+ fileManager.cleanUp();
+ }
+
@AfterClass
public void tearDown() {
FileUtils.deleteQuietly(TEMP_DIR);
diff --git
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordReader.java
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordReader.java
index dd9ab6d3758..729849d04be 100644
---
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordReader.java
+++
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordReader.java
@@ -25,6 +25,7 @@ import javax.annotation.Nullable;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericRecord;
import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordFetchException;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderConfig;
@@ -64,7 +65,13 @@ public class AvroRecordReader implements RecordReader {
@Override
public GenericRow next(GenericRow reuse)
throws IOException {
- _reusableAvroRecord = _avroReader.next(_reusableAvroRecord);
+ // Record fetch: read next Avro record from stream.
+ try {
+ _reusableAvroRecord = _avroReader.next(_reusableAvroRecord);
+ } catch (IOException e) {
+ throw new RecordFetchException("Failed to read next Avro record", e);
+ }
+ // Data parsing: extract into GenericRow.
_recordExtractor.extract(_reusableAvroRecord, reuse);
return reuse;
}
diff --git
a/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
b/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
index 68958480f95..42c2ab24068 100644
---
a/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
+++
b/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
@@ -35,6 +35,7 @@ import org.apache.commons.csv.CSVRecord;
import org.apache.commons.csv.QuoteMode;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordFetchException;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderConfig;
import org.apache.pinot.spi.data.readers.RecordReaderUtils;
@@ -217,10 +218,17 @@ public class CSVRecordReader implements RecordReader {
LOGGER.warn("Caught exception while reading CSV file: {}, recovering
from line: {}", _dataFile, _numSkippedLines,
exception);
- throw exception;
+ throw new RecordFetchException("Failed to read CSV record", exception);
}
- CSVRecord record = _iterator.next();
+ // Record fetch: read next CSV record from stream.
+ CSVRecord record;
+ try {
+ record = _iterator.next();
+ } catch (RuntimeException e) {
+ throw new RecordFetchException("Failed to read next CSV record", e);
+ }
+ // Data parsing: extract into GenericRow.
_recordExtractor.extract(record, reuse);
_nextLineId = _numSkippedLines + (int) _parser.getCurrentLineNumber();
return reuse;
diff --git
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
index 12303657b03..daba502b285 100644
---
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
+++
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
@@ -21,7 +21,6 @@ package org.apache.pinot.plugin.inputformat.csv;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
-import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -435,7 +434,7 @@ public class CSVRecordReaderTest extends
AbstractRecordReaderTest {
assertEquals(recordReader.next().getFieldToValueMap(), createMap("id",
"100", "name", "John"));
// Second line is unparseable, should throw exception when next() is
called, and being skipped
assertTrue(recordReader.hasNext());
- assertThrows(UncheckedIOException.class, recordReader::next);
+ assertThrows(IOException.class, recordReader::next);
// Third line is parseable
assertTrue(recordReader.hasNext());
assertEquals(recordReader.next().getFieldToValueMap(), createMap("id",
"102", "name", "Alice"));
@@ -482,7 +481,7 @@ public class CSVRecordReaderTest extends
AbstractRecordReaderTest {
assertEquals(recordReader.next().getFieldToValueMap(), createMap("id",
"100", "name", "John"));
// Second line is unparseable, should throw exception when next() is
called, and being skipped
assertTrue(recordReader.hasNext());
- assertThrows(UncheckedIOException.class, recordReader::next);
+ assertThrows(IOException.class, recordReader::next);
// 2 lines in total
assertFalse(recordReader.hasNext());
}
@@ -528,7 +527,7 @@ public class CSVRecordReaderTest extends
AbstractRecordReaderTest {
assertEquals(recordReader.next().getFieldToValueMap(),
createMap("id", "101", "firstName", "john", "lastName", "doe",
"appVersion", "1.0.1", "active", "yes"));
assertTrue(recordReader.hasNext());
- assertThrows(UncheckedIOException.class, recordReader::next);
+ assertThrows(IOException.class, recordReader::next);
assertFalse(recordReader.hasNext());
}
@@ -578,9 +577,9 @@ public class CSVRecordReaderTest extends
AbstractRecordReaderTest {
assertEquals(recordReader.next().getFieldToValueMap(), createMap("id",
"103", "name", "Suzanne"));
// NOTE: Here we need to skip twice because the first line is a comment
line
assertTrue(recordReader.hasNext());
- assertThrows(UncheckedIOException.class, recordReader::next);
+ assertThrows(IOException.class, recordReader::next);
assertTrue(recordReader.hasNext());
- assertThrows(UncheckedIOException.class, recordReader::next);
+ assertThrows(IOException.class, recordReader::next);
assertTrue(recordReader.hasNext());
assertEquals(recordReader.next().getFieldToValueMap(), createMap("id",
"105", "name", "Zack\nZack"));
assertTrue(recordReader.hasNext());
diff --git
a/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordReader.java
b/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordReader.java
index 6c739fd501b..14a30d93ba3 100644
---
a/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordReader.java
+++
b/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordReader.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordFetchException;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderConfig;
import org.apache.pinot.spi.data.readers.RecordReaderUtils;
@@ -78,8 +79,16 @@ public class JSONRecordReader implements RecordReader {
}
@Override
- public GenericRow next(GenericRow reuse) {
- Map<String, Object> record = _iterator.next();
+ public GenericRow next(GenericRow reuse)
+ throws IOException {
+ // Record fetch: read next JSON record from stream.
+ Map<String, Object> record;
+ try {
+ record = _iterator.next();
+ } catch (Exception e) {
+ throw new RecordFetchException("Failed to read next JSON record", e);
+ }
+ // Data parsing: extract into GenericRow.
_recordExtractor.extract(record, reuse);
return reuse;
}
diff --git
a/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java
b/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java
index ef5d2a934b9..52f891cc604 100644
---
a/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java
+++
b/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java
@@ -44,6 +44,7 @@ import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.TypeDescription;
import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordFetchException;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderConfig;
import org.apache.pinot.spi.data.readers.RecordReaderUtils;
@@ -180,6 +181,7 @@ public class ORCRecordReader implements RecordReader {
@Override
public GenericRow next(GenericRow reuse)
throws IOException {
+ // Data parsing: extract current batch row into GenericRow.
int numFields = _orcFields.size();
for (int i = 0; i < numFields; i++) {
if (!_includeOrcFields[i]) {
@@ -190,8 +192,13 @@ public class ORCRecordReader implements RecordReader {
reuse.putValue(field, extractValue(field, _rowBatch.cols[i], fieldType,
_nextRowId));
}
+ // Record fetch: read next batch when current one is exhausted.
if (_nextRowId == _rowBatch.size - 1) {
- _hasNext = _orcRecordReader.nextBatch(_rowBatch);
+ try {
+ _hasNext = _orcRecordReader.nextBatch(_rowBatch);
+ } catch (IOException e) {
+ throw new RecordFetchException("Failed to read next ORC record", e);
+ }
_nextRowId = 0;
} else {
_nextRowId++;
diff --git
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java
index 9ff0c4e61be..80906a34d92 100644
---
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java
+++
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetAvroRecordReader.java
@@ -26,6 +26,7 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordFetchException;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderConfig;
import org.apache.pinot.spi.data.readers.RecordReaderUtils;
@@ -65,8 +66,14 @@ public class ParquetAvroRecordReader implements RecordReader
{
@Override
public GenericRow next(GenericRow reuse)
throws IOException {
+ // Data parsing: extract current record into GenericRow.
_recordExtractor.extract(_nextRecord, reuse);
- _nextRecord = _parquetReader.read();
+ // Record fetch: read next Parquet Avro record.
+ try {
+ _nextRecord = _parquetReader.read();
+ } catch (IOException e) {
+ throw new RecordFetchException("Failed to read next Parquet Avro
record", e);
+ }
return reuse;
}
diff --git
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java
index ebb585bfe1b..a797643c5b1 100644
---
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java
+++
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetNativeRecordReader.java
@@ -35,6 +35,7 @@ import org.apache.parquet.io.ColumnIOFactory;
import org.apache.parquet.io.MessageColumnIO;
import org.apache.parquet.schema.MessageType;
import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordFetchException;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderConfig;
import org.apache.pinot.spi.data.readers.RecordReaderUtils;
@@ -112,7 +113,13 @@ public class ParquetNativeRecordReader implements
RecordReader {
@Override
public GenericRow next(GenericRow reuse)
throws IOException {
- _nextRecord = _parquetRecordReader.read();
+ // Record fetch: read next record from Parquet page.
+ try {
+ _nextRecord = _parquetRecordReader.read();
+ } catch (Exception e) {
+ throw new RecordFetchException("Failed to read next Parquet native
record", e);
+ }
+ // Data parsing: extract into GenericRow.
_recordExtractor.extract(_nextRecord, reuse);
_currentPageIdx++;
return reuse;
diff --git
a/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordReader.java
b/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordReader.java
index 90485d1090b..edd76987b7b 100644
---
a/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordReader.java
+++
b/pinot-plugins/pinot-input-format/pinot-protobuf/src/main/java/org/apache/pinot/plugin/inputformat/protobuf/ProtoBufRecordReader.java
@@ -29,6 +29,7 @@ import java.io.InputStream;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordFetchException;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderConfig;
import org.apache.pinot.spi.data.readers.RecordReaderUtils;
@@ -99,14 +100,16 @@ public class ProtoBufRecordReader implements RecordReader {
public GenericRow next(GenericRow reuse)
throws IOException {
Message message;
+ // Record fetch: read next protobuf message from stream.
try {
_builder.mergeDelimitedFrom(_inputStream);
message = _builder.build();
} catch (Exception e) {
- throw new IOException("Caught exception while reading protobuf object",
e);
+ throw new RecordFetchException("Caught exception while reading protobuf
object", e);
} finally {
_builder.clear();
}
+ // Data parsing: extract into GenericRow.
_recordExtractor.extract(message, reuse);
_hasNext = hasMoreToRead();
return reuse;
diff --git
a/pinot-plugins/pinot-input-format/pinot-thrift/src/main/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordReader.java
b/pinot-plugins/pinot-input-format/pinot-thrift/src/main/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordReader.java
index 1d8de8fa698..d28c21f56d7 100644
---
a/pinot-plugins/pinot-input-format/pinot-thrift/src/main/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordReader.java
+++
b/pinot-plugins/pinot-input-format/pinot-thrift/src/main/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordReader.java
@@ -26,6 +26,7 @@ import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordFetchException;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderConfig;
import org.apache.pinot.spi.data.readers.RecordReaderUtils;
@@ -110,7 +111,7 @@ public class ThriftRecordReader implements RecordReader {
tObject = (TBase) _thriftClass.newInstance();
tObject.read(_tProtocol);
} catch (Exception e) {
- throw new IOException("Caught exception while reading thrift object", e);
+ throw new RecordFetchException("Caught exception while reading thrift
object", e);
}
_recordExtractor.extract(tObject, reuse);
_hasNext = hasMoreToRead();
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
index e44d0b89246..c126c9b0487 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
@@ -67,6 +67,9 @@ public class IngestionConfig extends BaseJsonConfig {
@JsonPropertyDescription("Configs related to skip any row which has error
and continue during ingestion")
private boolean _continueOnError;
+ @JsonPropertyDescription("Max consecutive failures allowed while fetching
record from source.")
+ private int _maxConsecutiveRecordFetchFailuresAllowed;
+
@JsonPropertyDescription(
"Configs related to retry segment build on reduced size when previous
build fails on Preconditions check")
private boolean _retryOnSegmentBuildPrecheckFailure;
@@ -161,6 +164,10 @@ public class IngestionConfig extends BaseJsonConfig {
return _ingestionExceptionLogRateLimitPerMin;
}
+ public int getMaxConsecutiveRecordFetchFailuresAllowed() {
+ return _maxConsecutiveRecordFetchFailuresAllowed;
+ }
+
public void setBatchIngestionConfig(BatchIngestionConfig
batchIngestionConfig) {
_batchIngestionConfig = batchIngestionConfig;
}
@@ -213,4 +220,8 @@ public class IngestionConfig extends BaseJsonConfig {
public void setIngestionExceptionLogRateLimitPerMin(int
ingestionExceptionLogRateLimitPerMin) {
_ingestionExceptionLogRateLimitPerMin =
ingestionExceptionLogRateLimitPerMin;
}
+
+ public void setMaxConsecutiveRecordFetchFailuresAllowed(int
maxConsecutiveRecordFetchFailuresAllowed) {
+ _maxConsecutiveRecordFetchFailuresAllowed =
maxConsecutiveRecordFetchFailuresAllowed;
+ }
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordFetchException.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordFetchException.java
new file mode 100644
index 00000000000..4c5371de127
--- /dev/null
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordFetchException.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data.readers;
+
+import java.io.IOException;
+
+
+/**
+ * Exception indicating an I/O or data fetch error that may prevent the
RecordReader
+ * from advancing its pointer to the next record. This exception should be
thrown when
+ * the RecordReader encounters an error during data fetching that prevents it
from
+ * moving forward, potentially causing an infinite loop if the same record is
repeatedly
+ * attempted.
+ * <p>
+ * When this exception is thrown and continueOnError is enabled, it will count
toward
+ * the consecutive failure threshold to prevent infinite loops.
+ */
+public class RecordFetchException extends IOException {
+
+ private static final long serialVersionUID = 1L;
+
+ public RecordFetchException(String message) {
+ super(message);
+ }
+
+ public RecordFetchException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public RecordFetchException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReader.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReader.java
index db3a455da34..5b0a57f39d4 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReader.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReader.java
@@ -30,6 +30,17 @@ import javax.annotation.Nullable;
* The <code>RecordReader</code> interface is used to read records from
various file formats into {@link GenericRow}s.
* Pinot segments will be generated from {@link GenericRow}s.
* <p>NOTE: for time column, record reader should be able to read both
incoming and outgoing time
+ * <p>
+ * <h2>Exception Handling</h2>
+ * <p>When implementing {@link #next(GenericRow)}, implementations should use
appropriate exception types to indicate
+ * the nature of the error:
+ * <ul>
+ * <li><b>{@link RecordFetchException}</b>: Throw when encountering I/O or
data fetch errors that may prevent
+ * the reader from advancing its pointer to the next record. These
errors count toward the consecutive failure
+ * threshold to prevent infinite loops.</li>
+ * <li><b>{@link IOException}</b>: For backward compatibility, IOException
is treated as a parse error and can be
+ * skipped indefinitely.</li>
+ * </ul>
*/
public interface RecordReader extends Closeable, Serializable {
@@ -54,6 +65,18 @@ public interface RecordReader extends Closeable,
Serializable {
* Get the next record.
* <p>This method should be called only if {@link #hasNext()} returns
<code>true</code>. Caller is responsible for
* handling exceptions from this method and skip the row if user wants to
continue reading the remaining rows.
+ * <p>
+ * <b>Exception Guidelines:</b>
+ * <ul>
+ * <li>Throw {@link RecordFetchException} if the reader cannot advance due
to I/O or fetch errors</li>
+ * <li>Throw {@link IOException} if data is corrupt but the reader can
advance to the next record</li>
+ * </ul>
+ * <b>Important:</b> If an exception is thrown, the implementation should
ensure that either:
+ * <ul>
+ * <li>The reader's pointer advances to the next record (for parse
errors), OR</li>
+ * <li>The reader throws RecordFetchException to indicate it cannot
advance (for fetch errors)</li>
+ * </ul>
+ * This prevents infinite loops when the same record is repeatedly attempted.
*/
default GenericRow next()
throws IOException {
@@ -65,7 +88,21 @@ public interface RecordReader extends Closeable,
Serializable {
* <p>The passed in row should be cleared before calling this method.
* <p>This method should be called only if {@link #hasNext()} returns
<code>true</code>. Caller is responsible for
* handling exceptions from this method and skip the row if user wants to
continue reading the remaining rows.
- *
+ * <p>
+ * <b>Exception Guidelines:</b>
+ * <ul>
+ * <li>Throw {@link RecordFetchException} if the reader cannot advance due
to I/O or fetch errors (e.g., network
+ * failure, file system error, stream corruption that prevents
reading). These errors count toward the
+ * consecutive failure threshold.</li>
+ * <li>Throw {@link IOException} if data is corrupt or malformed but the
reader can advance to the next
+ * record (e.g., invalid format, type conversion failure, schema
mismatch).</li>
+ * </ul>
+ * <b>Important:</b> If an exception is thrown, the implementation should
ensure that either:
+ * <ul>
+ * <li>The reader's pointer advances to the next record (for parse
errors), OR</li>
+ * <li>The reader throws RecordFetchException to indicate it cannot
advance (for fetch errors)</li>
+ * </ul>
+ * This prevents infinite loops when the same record is repeatedly
attempted.<br>
* TODO: Consider clearing the row within the record reader to simplify the
caller
*/
GenericRow next(GenericRow reuse)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]