This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c3e348693f CSVRecordReader. Refactor init and line iterator parsing
logic (#13913)
c3e348693f is described below
commit c3e348693fe4b27a01e088be96160904cbb66ab5
Author: Suvodeep Pyne <[email protected]>
AuthorDate: Wed Sep 4 10:35:13 2024 -0700
CSVRecordReader. Refactor init and line iterator parsing logic (#13913)
---
.../plugin/inputformat/csv/CSVRecordReader.java | 389 ++++++++++-----------
.../inputformat/csv/CSVRecordReaderTest.java | 41 +++
.../pinot-csv/src/test/resources/dataFileBasic.csv | 5 +
.../resources/dataFileWithUnparseableLines2.csv | 5 +
4 files changed, 237 insertions(+), 203 deletions(-)
diff --git
a/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
b/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
index c4dc8c167f..daf6041a09 100644
---
a/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
+++
b/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
@@ -29,6 +29,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
+import java.util.Optional;
import java.util.Set;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
@@ -58,112 +59,124 @@ public class CSVRecordReader implements RecordReader {
private Iterator<CSVRecord> _iterator;
private CSVRecordExtractor _recordExtractor;
private Map<String, Integer> _headerMap = new HashMap<>();
- private boolean _isHeaderProvided = false;
- // line iterator specific variables
- private boolean _useLineIterator = false;
- private boolean _skipHeaderRecord = false;
- private long _skippedLinesCount;
private BufferedReader _bufferedReader;
- private String _nextLine;
- private GenericRow _nextRecord;
+ private CSVRecordReaderConfig _config = null;
public CSVRecordReader() {
}
- @Override
- public void init(File dataFile, @Nullable Set<String> fieldsToRead,
@Nullable RecordReaderConfig recordReaderConfig)
- throws IOException {
- _dataFile = dataFile;
- CSVRecordReaderConfig config = (CSVRecordReaderConfig) recordReaderConfig;
- Character multiValueDelimiter = null;
- if (config == null) {
- _format =
CSVFormat.DEFAULT.builder().setDelimiter(CSVRecordReaderConfig.DEFAULT_DELIMITER).setHeader().build();
- multiValueDelimiter =
CSVRecordReaderConfig.DEFAULT_MULTI_VALUE_DELIMITER;
- } else {
- CSVFormat format;
- String formatString = config.getFileFormat();
- if (formatString == null) {
- format = CSVFormat.DEFAULT;
- } else {
- switch (formatString.toUpperCase()) {
- case "EXCEL":
- format = CSVFormat.EXCEL;
- break;
- case "MYSQL":
- format = CSVFormat.MYSQL;
- break;
- case "RFC4180":
- format = CSVFormat.RFC4180;
- break;
- case "TDF":
- format = CSVFormat.TDF;
- break;
- default:
- format = CSVFormat.DEFAULT;
- break;
- }
- }
- char delimiter = config.getDelimiter();
- format = format.builder().setDelimiter(delimiter).build();
+ private static CSVFormat baseCsvFormat(CSVRecordReaderConfig config) {
+ if (config.getFileFormat() == null) {
+ return CSVFormat.DEFAULT;
+ }
+ switch (config.getFileFormat().toUpperCase()) {
+ case "EXCEL":
+ return CSVFormat.EXCEL;
+ case "MYSQL":
+ return CSVFormat.MYSQL;
+ case "RFC4180":
+ return CSVFormat.RFC4180;
+ case "TDF":
+ return CSVFormat.TDF;
+ default:
+ return CSVFormat.DEFAULT;
+ }
+ }
- if (config.isSkipUnParseableLines()) {
- _useLineIterator = true;
+ private static Map<String, Integer> parseHeaderMapFromLine(CSVFormat format,
String line) {
+ try (StringReader stringReader = new StringReader(line)) {
+ try (CSVParser parser = format.parse(stringReader)) {
+ return parser.getHeaderMap();
}
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to parse header from line: " + line,
e);
+ }
+ }
- _isHeaderProvided = config.getHeader() != null;
- _skipHeaderRecord = config.isSkipHeader();
- _format = format.builder()
- .setHeader()
- .setSkipHeaderRecord(config.isSkipHeader())
- .setCommentMarker(config.getCommentMarker())
- .setEscape(config.getEscapeCharacter())
- .setIgnoreEmptyLines(config.isIgnoreEmptyLines())
- .setIgnoreSurroundingSpaces(config.isIgnoreSurroundingSpaces())
- .setQuote(config.getQuoteCharacter())
- .build();
-
- if (config.getQuoteMode() != null) {
- _format =
_format.builder().setQuoteMode(QuoteMode.valueOf(config.getQuoteMode())).build();
- }
+ private static Character getMultiValueDelimiter(CSVRecordReaderConfig
config) {
+ if (config == null) {
+ return CSVRecordReaderConfig.DEFAULT_MULTI_VALUE_DELIMITER;
+ } else if (config.isMultiValueDelimiterEnabled()) {
+ return config.getMultiValueDelimiter();
+ }
+ return null;
+ }
- if (config.getRecordSeparator() != null) {
- _format =
_format.builder().setRecordSeparator(config.getRecordSeparator()).build();
- }
+ private static boolean useLineIterator(CSVRecordReaderConfig config) {
+ return config != null && config.isSkipUnParseableLines();
+ }
- String nullString = config.getNullStringValue();
- if (nullString != null) {
- _format = _format.builder().setNullString(nullString).build();
- }
+ @Override
+ public void init(File dataFile, @Nullable Set<String> fieldsToRead,
@Nullable RecordReaderConfig recordReaderConfig)
+ throws IOException {
+ _dataFile = dataFile;
+ _config = (CSVRecordReaderConfig) recordReaderConfig;
+ _format = createCSVFormat();
+
+ // If header is provided by the client, use it. Otherwise, parse the
header from the first line of the file.
+ // Overwrite the format with the header information.
+
Optional.ofNullable(_config).map(CSVRecordReaderConfig::getHeader).ifPresent(header
-> {
+ _headerMap = parseHeaderMapFromLine(_format, header);
+ _format = _format.builder().setHeader(_headerMap.keySet().toArray(new
String[0])).build();
+ });
+
+ validateHeaderWithDelimiter();
+ initIterator();
+ initRecordExtractor(fieldsToRead);
+ }
- if (_isHeaderProvided) {
- _headerMap = parseLineAsHeader(config.getHeader());
- _format = _format.builder().setHeader(_headerMap.keySet().toArray(new
String[0])).build();
- if (!_useLineIterator) {
- validateHeaderForDelimiter(delimiter, config.getHeader(), _format);
- }
- }
+ private void initRecordExtractor(Set<String> fieldsToRead) {
+ final CSVRecordExtractorConfig recordExtractorConfig = new
CSVRecordExtractorConfig();
+
recordExtractorConfig.setMultiValueDelimiter(getMultiValueDelimiter(_config));
+ recordExtractorConfig.setColumnNames(_headerMap.keySet());
+ _recordExtractor = new CSVRecordExtractor();
+ _recordExtractor.init(fieldsToRead, recordExtractorConfig);
+ }
- if (config.isMultiValueDelimiterEnabled()) {
- multiValueDelimiter = config.getMultiValueDelimiter();
- }
+ private CSVFormat createCSVFormat() {
+ if (_config == null) {
+ return
CSVFormat.DEFAULT.builder().setDelimiter(CSVRecordReaderConfig.DEFAULT_DELIMITER).setHeader().build();
}
- _recordExtractor = new CSVRecordExtractor();
- init();
+ final CSVFormat.Builder builder = baseCsvFormat(_config).builder()
+ .setDelimiter(_config.getDelimiter())
+ .setHeader()
+ .setSkipHeaderRecord(_config.isSkipHeader())
+ .setCommentMarker(_config.getCommentMarker())
+ .setEscape(_config.getEscapeCharacter())
+ .setIgnoreEmptyLines(_config.isIgnoreEmptyLines())
+ .setIgnoreSurroundingSpaces(_config.isIgnoreSurroundingSpaces())
+ .setQuote(_config.getQuoteCharacter());
+
+
Optional.ofNullable(_config.getQuoteMode()).map(QuoteMode::valueOf).ifPresent(builder::setQuoteMode);
+
Optional.ofNullable(_config.getRecordSeparator()).ifPresent(builder::setRecordSeparator);
+
Optional.ofNullable(_config.getNullStringValue()).ifPresent(builder::setNullString);
+
+ return builder.build();
+ }
- CSVRecordExtractorConfig recordExtractorConfig = new
CSVRecordExtractorConfig();
- recordExtractorConfig.setMultiValueDelimiter(multiValueDelimiter);
- recordExtractorConfig.setColumnNames(_headerMap.keySet());
- _recordExtractor.init(fieldsToRead, recordExtractorConfig);
+ private void initIterator()
+ throws IOException {
+ if (useLineIterator(_config)) {
+ _bufferedReader = new BufferedReader(new FileReader(_dataFile), 1024 *
32); // 32KB buffer size
+ _iterator = new LineIterator();
+ } else {
+ _parser = _format.parse(RecordReaderUtils.getBufferedReader(_dataFile));
+ _headerMap = _parser.getHeaderMap();
+ _iterator = _parser.iterator();
+ }
}
- private void validateHeaderForDelimiter(char delimiter, String csvHeader,
CSVFormat format)
+ private void validateHeaderWithDelimiter()
throws IOException {
- CSVParser parser =
format.parse(RecordReaderUtils.getBufferedReader(_dataFile));
- Iterator<CSVRecord> iterator = parser.iterator();
- if (iterator.hasNext() && recordHasMultipleValues(iterator.next()) &&
delimiterNotPresentInHeader(delimiter,
- csvHeader)) {
+ if (_config == null || _config.getHeader() == null ||
useLineIterator(_config)) {
+ return;
+ }
+ final CSVParser parser =
_format.parse(RecordReaderUtils.getBufferedReader(_dataFile));
+ final Iterator<CSVRecord> iterator = parser.iterator();
+ if (iterator.hasNext() && recordHasMultipleValues(iterator.next()) &&
delimiterNotPresentInHeader(
+ _config.getDelimiter(), _config.getHeader())) {
throw new IllegalArgumentException("Configured header does not contain
the configured delimiter");
}
}
@@ -176,17 +189,6 @@ public class CSVRecordReader implements RecordReader {
return !StringUtils.contains(csvHeader, delimiter);
}
- private void init()
- throws IOException {
- if (_useLineIterator) {
- initLineIteratorResources();
- return;
- }
- _parser = _format.parse(RecordReaderUtils.getBufferedReader(_dataFile));
- _headerMap = _parser.getHeaderMap();
- _iterator = _parser.iterator();
- }
-
/**
* Returns a copy of the header map that iterates in column order.
* <p>
@@ -202,158 +204,139 @@ public class CSVRecordReader implements RecordReader {
@Override
public boolean hasNext() {
- if (_useLineIterator) {
- // When line iterator is used, the call to this method won't throw an
exception. The default and the only iterator
- // from commons-csv library can throw an exception upon calling the
hasNext() method. The line iterator overcomes
- // this limitation.
- return readNextRecord();
- }
return _iterator.hasNext();
}
@Override
public GenericRow next()
throws IOException {
- if (_useLineIterator) {
- return _nextRecord;
- } else {
- return next(new GenericRow());
- }
+ return next(new GenericRow());
}
@Override
public GenericRow next(GenericRow reuse)
throws IOException {
- if (_useLineIterator) {
- reuse.init(_nextRecord);
- } else {
- CSVRecord record = _iterator.next();
- _recordExtractor.extract(record, reuse);
- }
+ CSVRecord record = _iterator.next();
+ _recordExtractor.extract(record, reuse);
return reuse;
}
@Override
public void rewind()
throws IOException {
- if (_useLineIterator) {
- resetLineIteratorResources();
- }
-
if (_parser != null && !_parser.isClosed()) {
_parser.close();
}
-
- init();
+ closeIterator();
+ initIterator();
}
@Override
public void close()
throws IOException {
- if (_useLineIterator) {
- resetLineIteratorResources();
- }
+ closeIterator();
if (_parser != null && !_parser.isClosed()) {
_parser.close();
}
}
- private boolean readNextRecord() {
- try {
- _nextRecord = null;
- GenericRow genericRow = new GenericRow();
- readNextLine(genericRow);
- _nextRecord = genericRow;
- } catch (Exception e) {
- LOGGER.info("Error parsing next record.", e);
+ private void closeIterator()
+ throws IOException {
+ // if header is not provided by the client it would be rebuilt. When it's
provided by the client it's initialized
+ // once in the constructor
+ if (useLineIterator(_config) && _config.getHeader() == null) {
+ _headerMap.clear();
+ }
+
+ if (_bufferedReader != null) {
+ _bufferedReader.close();
}
- return _nextRecord != null;
}
- private void readNextLine(GenericRow reuse)
- throws IOException {
- while (_nextLine != null) {
- try (Reader reader = new StringReader(_nextLine)) {
- try (CSVParser csvParser = _format.parse(reader)) {
- List<CSVRecord> csvRecords = csvParser.getRecords();
- if (csvRecords != null && csvRecords.size() > 0) {
- // There would be only one record as lines are read one after the
other
- CSVRecord record = csvRecords.get(0);
- _recordExtractor.extract(record, reuse);
- break;
- } else {
- // Can be thrown on: 1) Empty lines 2) Commented lines
- throw new NoSuchElementException("Failed to find any records");
+ class LineIterator implements Iterator<CSVRecord> {
+ private String _nextLine;
+ private CSVRecord _current;
+
+ public LineIterator() {
+ init();
+ }
+
+ private void init() {
+ try {
+ if (_config.getHeader() != null) {
+ if (_config.isSkipHeader()) {
+ // When skip header config is set and header is supplied – skip
the first line from the input file
+ _bufferedReader.readLine();
+ // turn off the property so that it doesn't interfere with further
parsing
+ _format = _format.builder().setSkipHeaderRecord(false).build();
}
- } catch (Exception e) {
- _skippedLinesCount++;
- LOGGER.debug("Skipped input line: {} from file: {}", _nextLine,
_dataFile, e);
- // Find the next line that can be parsed
- _nextLine = _bufferedReader.readLine();
+ } else {
+ // read the first line
+ String headerLine = _bufferedReader.readLine();
+ _headerMap = parseHeaderMapFromLine(_format, headerLine);
+ // If header isn't provided, the first line would be set as header
and the 'skipHeader' property
+ // is set to false.
+ _format = _format.builder()
+ .setSkipHeaderRecord(false)
+ .setHeader(_headerMap.keySet().toArray(new String[0]))
+ .build();
}
+ _nextLine = _bufferedReader.readLine();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
}
- if (_nextLine != null) {
- // Advance the pointer to the next line for future reading
- _nextLine = _bufferedReader.readLine();
- } else {
- throw new RuntimeException("No more parseable lines. Line iterator
reached end of file.");
- }
- }
- private Map<String, Integer> parseLineAsHeader(String line)
- throws IOException {
- Map<String, Integer> headerMap;
- try (StringReader stringReader = new StringReader(line)) {
- try (CSVParser parser = _format.parse(stringReader)) {
- headerMap = parser.getHeaderMap();
+ private CSVRecord getNextRecord() {
+ while (_nextLine != null) {
+ try (Reader reader = new StringReader(_nextLine)) {
+ try (CSVParser csvParser = _format.parse(reader)) {
+ List<CSVRecord> csvRecords = csvParser.getRecords();
+ if (csvRecords == null || csvRecords.isEmpty()) {
+ // Can be thrown on: 1) Empty lines 2) Commented lines
+ throw new NoSuchElementException("Failed to find any records");
+ }
+ // There would be only one record as lines are read one after the
other
+ CSVRecord csvRecord = csvRecords.get(0);
+
+ // move the pointer to the next line
+ _nextLine = _bufferedReader.readLine();
+ return csvRecord;
+ } catch (Exception e) {
+ // Find the next line that can be parsed
+ _nextLine = _bufferedReader.readLine();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
+ return null;
}
- return headerMap;
- }
- private void initLineIteratorResources()
- throws IOException {
- _bufferedReader = new BufferedReader(new FileReader(_dataFile), 1024 *
32); // 32KB buffer size
-
- // When header is supplied by the client
- if (_isHeaderProvided) {
- if (_skipHeaderRecord) {
- // When skip header config is set and header is supplied – skip the
first line from the input file
- _bufferedReader.readLine();
- // turn off the property so that it doesn't interfere with further
parsing
- _format = _format.builder().setSkipHeaderRecord(false).build();
+ @Override
+ public boolean hasNext() {
+ if (_current == null) {
+ _current = getNextRecord();
}
- } else {
- // read the first line
- String headerLine = _bufferedReader.readLine();
- _headerMap = parseLineAsHeader(headerLine);
- _format = _format.builder()
- // If header isn't provided, the first line would be set as header
and the 'skipHeader' property
- // is set to false.
- .setSkipHeaderRecord(false)
- .setHeader(_headerMap.keySet().toArray(new String[0]))
- .build();
- }
- _nextLine = _bufferedReader.readLine();
- }
- private void resetLineIteratorResources()
- throws IOException {
- _nextLine = null;
+ return _current != null;
+ }
- LOGGER.info("Total lines skipped in file: {} were: {}", _dataFile,
_skippedLinesCount);
- _skippedLinesCount = 0;
+ @Override
+ public CSVRecord next() {
+ CSVRecord next = _current;
+ _current = null;
- // if header is not provided by the client it would be rebuilt. When it's
provided by the client it's initialized
- // once in the constructor
- if (!_isHeaderProvided) {
- _headerMap.clear();
- }
+ if (next == null) {
+ // hasNext() wasn't called before
+ next = getNextRecord();
+ if (next == null) {
+ throw new NoSuchElementException("No more CSV records available");
+ }
+ }
- if (_bufferedReader != null) {
- _bufferedReader.close();
+ return next;
}
}
}
diff --git
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
index d245fb33c0..6a1d86a48d 100644
---
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
+++
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
@@ -324,6 +324,47 @@ public class CSVRecordReaderTest extends
AbstractRecordReaderTest {
Assert.assertEquals(1, genericRows.size());
}
+ @Test
+ public void testLineIteratorReadingDataFileWithUnparseableLinesWithRewind()
+ throws URISyntaxException, IOException {
+ URI uri =
ClassLoader.getSystemResource("dataFileWithUnparseableLines2.csv").toURI();
+ File dataFile = new File(uri);
+
+ CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
+ readerConfig.setSkipUnParseableLines(true);
+ final List<GenericRow> genericRows1 = readCSVRecords(dataFile,
readerConfig, null, true);
+ Assert.assertEquals(3, genericRows1.size());
+
+ // Start reading again; results should be same
+ final List<GenericRow> genericRows2 = readCSVRecords(dataFile,
readerConfig, null, true);
+ Assert.assertEquals(3, genericRows2.size());
+
+ // Check that the rows are the same
+ for (int i = 0; i < genericRows1.size(); i++) {
+ Assert.assertEquals(genericRows1.get(i), genericRows2.get(i));
+ }
+ }
+
+ @Test
+ public void testReadingDataFileWithRewind()
+ throws URISyntaxException, IOException {
+ URI uri = ClassLoader.getSystemResource("dataFileBasic.csv").toURI();
+ File dataFile = new File(uri);
+
+ CSVRecordReaderConfig readerConfig = new CSVRecordReaderConfig();
+ List<GenericRow> genericRows1 = readCSVRecords(dataFile, readerConfig,
null, true);
+ Assert.assertEquals(4, genericRows1.size());
+
+ // Start reading again; results should be same
+ List<GenericRow> genericRows2 = readCSVRecords(dataFile, readerConfig,
null, true);
+ Assert.assertEquals(4, genericRows2.size());
+
+ // Check that the rows are the same
+ for (int i = 0; i < genericRows1.size(); i++) {
+ Assert.assertEquals(genericRows1.get(i), genericRows2.get(i));
+ }
+ }
+
@Test (expectedExceptions = RuntimeException.class)
public void
testDefaultCsvReaderExceptionReadingDataFileWithUnparseableLines()
throws URISyntaxException, IOException {
diff --git
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileBasic.csv
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileBasic.csv
new file mode 100644
index 0000000000..c2b0fe3262
--- /dev/null
+++
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileBasic.csv
@@ -0,0 +1,5 @@
+id,name
+"100","John"
+"101","Jane"
+"102","Alice"
+"103","Bob"
diff --git
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLines2.csv
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLines2.csv
new file mode 100644
index 0000000000..80e9a736c3
--- /dev/null
+++
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/resources/dataFileWithUnparseableLines2.csv
@@ -0,0 +1,5 @@
+id,name
+"100","John"s"
+"101","Jane"
+"102","Alice"
+"103","Bob"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]