sohurdc commented on code in PR #8507: URL: https://github.com/apache/seatunnel/pull/8507#discussion_r1914025191
########## seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java: ########## @@ -80,6 +86,212 @@ public class OrcReadStrategy extends AbstractReadStrategy { private static final long MIN_SIZE = 16 * 1024; + private int batchReadRows = 1024; + + /** user can specified row count per split */ + private long rowCountPerSplitByUser = 0; + + private final long DEFAULT_FILE_SIZE_PER_SPLIT = 1024 * 1024 * 30; + private final long DEFAULT_ROW_COUNT = 100000; + private long fileSizePerSplitByUser = DEFAULT_FILE_SIZE_PER_SPLIT; + + @Override + public void setPluginConfig(Config pluginConfig) { + super.setPluginConfig(pluginConfig); + if (pluginConfig.hasPath(BaseSourceConfigOptions.ROW_COUNT_PER_SPLIT.key())) { + rowCountPerSplitByUser = + pluginConfig.getLong(BaseSourceConfigOptions.ROW_COUNT_PER_SPLIT.key()); + } + if (pluginConfig.hasPath(BaseSourceConfigOptions.FILE_SIZE_PER_SPLIT.key())) { + fileSizePerSplitByUser = + pluginConfig.getLong(BaseSourceConfigOptions.FILE_SIZE_PER_SPLIT.key()); + } + } + + /** + * split a file into many splits: good: 1. lower memory occupy. split read end, the memory can + * recycle. 2. lower checkpoint ack delay 3. Support fine-grained concurrency bad: 1. cannot + * guarantee the order of the data. + * + * @param path 文件路径 + * @return FileSourceSplit set + */ + @Override + public Set<FileSourceSplit> getFileSourceSplits(String path) { + if (Boolean.FALSE.equals(checkFileType(path))) { + String errorMsg = + String.format( + "This file [%s] is not a orc file, please check the format of this file", + path); + throw new FileConnectorException(FileConnectorErrorCode.FILE_TYPE_INVALID, errorMsg); + } + Set<FileSourceSplit> fileSourceSplits = new HashSet<>(); + try (Reader reader = + hadoopFileSystemProxy.doWithHadoopAuth( + ((configuration, userGroupInformation) -> { + OrcFile.ReaderOptions readerOptions = + OrcFile.readerOptions(configuration); + return OrcFile.createReader(new Path(path), readerOptions); + }))) { + log.info( + "path:{}, rowCountPerSplitByUser:{}, fileSizePerSplitByUser:{}, fileSize:{}, stripCount:{}, rowCount:{}", + path, + rowCountPerSplitByUser, + fileSizePerSplitByUser, + reader.getContentLength(), + reader.getStripes().size(), + reader.getNumberOfRows()); + long rowCountPerSplit = rowCountPerSplitByUser; + if (rowCountPerSplit <= 0) { + // 按照文件大小自动分片 Review Comment: ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org