sohurdc commented on code in PR #8507:
URL: https://github.com/apache/seatunnel/pull/8507#discussion_r1914025191


##########
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java:
##########
@@ -80,6 +86,212 @@
 public class OrcReadStrategy extends AbstractReadStrategy {
     private static final long MIN_SIZE = 16 * 1024;
 
+    private int batchReadRows = 1024;
+
+    /** user can specified row count per split */
+    private long rowCountPerSplitByUser = 0;
+
+    private final long DEFAULT_FILE_SIZE_PER_SPLIT = 1024 * 1024 * 30;
+    private final long DEFAULT_ROW_COUNT = 100000;
+    private long fileSizePerSplitByUser = DEFAULT_FILE_SIZE_PER_SPLIT;
+
+    @Override
+    public void setPluginConfig(Config pluginConfig) {
+        super.setPluginConfig(pluginConfig);
+        if 
(pluginConfig.hasPath(BaseSourceConfigOptions.ROW_COUNT_PER_SPLIT.key())) {
+            rowCountPerSplitByUser =
+                    
pluginConfig.getLong(BaseSourceConfigOptions.ROW_COUNT_PER_SPLIT.key());
+        }
+        if 
(pluginConfig.hasPath(BaseSourceConfigOptions.FILE_SIZE_PER_SPLIT.key())) {
+            fileSizePerSplitByUser =
+                    
pluginConfig.getLong(BaseSourceConfigOptions.FILE_SIZE_PER_SPLIT.key());
+        }
+    }
+
+    /**
+     * split a file into many splits: good: 1. lower memory occupy. split read 
end, the memory can
+     * recycle. 2. lower checkpoint ack delay 3. Support fine-grained 
concurrency bad: 1. cannot
+     * guarantee the order of the data.
+     *
+     * @param path 文件路径
+     * @return FileSourceSplit set
+     */
+    @Override
+    public Set<FileSourceSplit> getFileSourceSplits(String path) {
+        if (Boolean.FALSE.equals(checkFileType(path))) {
+            String errorMsg =
+                    String.format(
+                            "This file [%s] is not a orc file, please check 
the format of this file",
+                            path);
+            throw new 
FileConnectorException(FileConnectorErrorCode.FILE_TYPE_INVALID, errorMsg);
+        }
+        Set<FileSourceSplit> fileSourceSplits = new HashSet<>();
+        try (Reader reader =
+                hadoopFileSystemProxy.doWithHadoopAuth(
+                        ((configuration, userGroupInformation) -> {
+                            OrcFile.ReaderOptions readerOptions =
+                                    OrcFile.readerOptions(configuration);
+                            return OrcFile.createReader(new Path(path), 
readerOptions);
+                        }))) {
+            log.info(
+                    "path:{}, rowCountPerSplitByUser:{}, 
fileSizePerSplitByUser:{}, fileSize:{}, stripCount:{}, rowCount:{}",
+                    path,
+                    rowCountPerSplitByUser,
+                    fileSizePerSplitByUser,
+                    reader.getContentLength(),
+                    reader.getStripes().size(),
+                    reader.getNumberOfRows());
+            long rowCountPerSplit = rowCountPerSplitByUser;
+            if (rowCountPerSplit <= 0) {
+                // 按照文件大小自动分片

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to