Copilot commented on code in PR #9391:
URL: https://github.com/apache/seatunnel/pull/9391#discussion_r2123260473


##########
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java:
##########
@@ -43,11 +45,34 @@ public class BinaryReadStrategy extends 
AbstractReadStrategy {
                     });
 
     private File basePath;
+    private int binaryChunkSize = 
FileBaseSourceOptions.BINARY_CHUNK_SIZE.defaultValue();
+    private boolean completeFileMode =
+            FileBaseSourceOptions.BINARY_COMPLETE_FILE_MODE.defaultValue();
 
     @Override
     public void init(HadoopConf conf) {
         super.init(conf);
         basePath = new 
File(pluginConfig.getString(FileBaseSourceOptions.FILE_PATH.key()));
+
+        // Load binary chunk size configuration
+        if 
(pluginConfig.hasPath(FileBaseSourceOptions.BINARY_CHUNK_SIZE.key())) {
+            binaryChunkSize = 
pluginConfig.getInt(FileBaseSourceOptions.BINARY_CHUNK_SIZE.key());
+            // Validate chunk size - should be positive and reasonable
+            if (binaryChunkSize <= 0) {
+                throw new IllegalArgumentException(
+                        "Binary chunk size must be positive, got: " + 
binaryChunkSize);
+            }
+            if (binaryChunkSize > 100 * 1024 * 1024) { // 100MB limit

Review Comment:
   [nitpick] Extract the `100 * 1024 * 1024` magic number into a descriptive 
static final constant (e.g., `MAX_BINARY_CHUNK_SIZE`) to improve readability 
and maintainability.
   ```suggestion
               if (binaryChunkSize > MAX_BINARY_CHUNK_SIZE) { // 100MB limit
   ```



##########
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java:
##########
@@ -43,11 +45,34 @@ public class BinaryReadStrategy extends 
AbstractReadStrategy {
                     });
 
     private File basePath;
+    private int binaryChunkSize = 
FileBaseSourceOptions.BINARY_CHUNK_SIZE.defaultValue();
+    private boolean completeFileMode =
+            FileBaseSourceOptions.BINARY_COMPLETE_FILE_MODE.defaultValue();
 
     @Override
     public void init(HadoopConf conf) {
         super.init(conf);
         basePath = new 
File(pluginConfig.getString(FileBaseSourceOptions.FILE_PATH.key()));
+
+        // Load binary chunk size configuration
+        if 
(pluginConfig.hasPath(FileBaseSourceOptions.BINARY_CHUNK_SIZE.key())) {
+            binaryChunkSize = 
pluginConfig.getInt(FileBaseSourceOptions.BINARY_CHUNK_SIZE.key());
+            // Validate chunk size - should be positive and reasonable
+            if (binaryChunkSize <= 0) {
+                throw new IllegalArgumentException(
+                        "Binary chunk size must be positive, got: " + 
binaryChunkSize);
+            }
+            if (binaryChunkSize > 100 * 1024 * 1024) { // 100MB limit
+                throw new IllegalArgumentException(

Review Comment:
   [nitpick] Consider using a connector-specific exception (e.g., 
`FileConnectorException`) instead of `IllegalArgumentException` to provide 
consistent error reporting within the framework.
   ```suggestion
                   throw new FileConnectorException(
                           "FILE_CONNECTOR_ERROR",
                           "Binary chunk size must be positive, got: " + 
binaryChunkSize);
               }
               if (binaryChunkSize > 100 * 1024 * 1024) { // 100MB limit
                   throw new FileConnectorException(
                           "FILE_CONNECTOR_ERROR",
   ```



##########
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java:
##########
@@ -66,21 +91,49 @@ public void read(String path, String tableId, 
Collector<SeaTunnelRow> output)
                     relativePath = 
relativePath.substring(File.separator.length());
                 }
             }
-            // TODO config this size
-            int maxSize = 1024;
-            byte[] buffer = new byte[maxSize];
-            long partIndex = 0;
-            int readSize;
-            while ((readSize = inputStream.read(buffer)) != -1) {
-                if (readSize != maxSize) {
-                    buffer = Arrays.copyOf(buffer, readSize);
-                }
-                SeaTunnelRow row = new SeaTunnelRow(new Object[] {buffer, 
relativePath, partIndex});
-                buffer = new byte[1024];
-                row.setTableId(tableId);
-                output.collect(row);
-                partIndex++;
+
+            if (completeFileMode) {
+                // Read entire file as a single chunk
+                readCompleteFile(inputStream, relativePath, tableId, output);
+            } else {
+                // Read file in configurable chunks
+                readFileInChunks(inputStream, relativePath, tableId, output);
+            }
+        }
+    }
+
+    /** Read the entire file as a single chunk. */
+    private void readCompleteFile(
+            InputStream inputStream,
+            String relativePath,
+            String tableId,
+            Collector<SeaTunnelRow> output)
+            throws IOException {
+        byte[] fileContent = IOUtils.toByteArray(inputStream);
+        SeaTunnelRow row = new SeaTunnelRow(new Object[] {fileContent, 
relativePath, 0L});
+        row.setTableId(tableId);
+        output.collect(row);

Review Comment:
   Reading the entire file into memory via `IOUtils.toByteArray` can lead to 
high memory usage for large files; consider imposing a size check or using a 
streaming approach to avoid potential OOM issues.
   ```suggestion
           try (ByteArrayOutputStream outputStream = new 
ByteArrayOutputStream()) {
               byte[] buffer = new byte[binaryChunkSize];
               int bytesRead;
               while ((bytesRead = inputStream.read(buffer)) != -1) {
                   outputStream.write(buffer, 0, bytesRead);
               }
               byte[] fileContent = outputStream.toByteArray();
               SeaTunnelRow row = new SeaTunnelRow(new Object[] {fileContent, 
relativePath, 0L});
               row.setTableId(tableId);
               output.collect(row);
           }
   ```



##########
seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/reader/BinaryReadStrategyTest.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.reader;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.BinaryReadStrategy;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import lombok.Getter;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT;
+
+public class BinaryReadStrategyTest {
+
+    @TempDir Path tempDir;
+
+    private BinaryReadStrategy binaryReadStrategy;
+    private LocalConf localConf;
+
+    @BeforeEach
+    public void setUp() {
+        binaryReadStrategy = new BinaryReadStrategy();
+        localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+    }
+
+    @Test
+    public void testBinaryReadWithDefaultChunkSize() throws IOException {
+        // Create a test file with 2048 bytes (2 chunks of 1024 bytes each)
+        File testFile = createTestFile("test_binary_default.bin", 2048);
+
+        Config config = createConfig(testFile.getParent(), null, null);
+        binaryReadStrategy.setPluginConfig(config);
+        binaryReadStrategy.init(localConf);
+
+        TestCollector collector = new TestCollector();
+        binaryReadStrategy.read(testFile.getAbsolutePath(), "test_table", 
collector);
+
+        List<SeaTunnelRow> rows = collector.getRows();
+        Assertions.assertEquals(
+                2, rows.size(), "Should have 2 chunks for 2048 bytes with 
default 1024 chunk size");
+
+        // Verify first chunk
+        SeaTunnelRow firstRow = rows.get(0);
+        Assertions.assertEquals(3, firstRow.getArity());
+        byte[] firstChunkData = (byte[]) firstRow.getField(0);
+        Assertions.assertEquals(1024, firstChunkData.length);
+        Assertions.assertEquals("test_binary_default.bin", 
firstRow.getField(1));
+        Assertions.assertEquals(0L, firstRow.getField(2));
+
+        // Verify second chunk
+        SeaTunnelRow secondRow = rows.get(1);
+        byte[] secondChunkData = (byte[]) secondRow.getField(0);
+        Assertions.assertEquals(1024, secondChunkData.length);
+        Assertions.assertEquals("test_binary_default.bin", 
secondRow.getField(1));
+        Assertions.assertEquals(1L, secondRow.getField(2));
+    }
+
+    @Test
+    public void testBinaryReadWithCustomChunkSize() throws IOException {
+        // Create a test file with 1500 bytes
+        File testFile = createTestFile("test_binary_custom.bin", 1500);
+
+        Config config = createConfig(testFile.getParent(), 512, null);
+        binaryReadStrategy.setPluginConfig(config);
+        binaryReadStrategy.init(localConf);
+
+        TestCollector collector = new TestCollector();
+        binaryReadStrategy.read(testFile.getAbsolutePath(), "test_table", 
collector);
+
+        List<SeaTunnelRow> rows = collector.getRows();
+        Assertions.assertEquals(
+                3, rows.size(), "Should have 3 chunks for 1500 bytes with 512 
chunk size");
+
+        // Verify chunk sizes: 512, 512, 476
+        Assertions.assertEquals(512, ((byte[]) 
rows.get(0).getField(0)).length);
+        Assertions.assertEquals(512, ((byte[]) 
rows.get(1).getField(0)).length);
+        Assertions.assertEquals(476, ((byte[]) 
rows.get(2).getField(0)).length);
+
+        // Verify part indices
+        Assertions.assertEquals(0L, rows.get(0).getField(2));
+        Assertions.assertEquals(1L, rows.get(1).getField(2));
+        Assertions.assertEquals(2L, rows.get(2).getField(2));
+    }
+
+    @Test
+    public void testBinaryReadCompleteFileMode() throws IOException {
+        // Create a test file with 2048 bytes
+        File testFile = createTestFile("test_binary_complete.bin", 2048);
+
+        Config config = createConfig(testFile.getParent(), null, true);
+        binaryReadStrategy.setPluginConfig(config);
+        binaryReadStrategy.init(localConf);
+
+        TestCollector collector = new TestCollector();
+        binaryReadStrategy.read(testFile.getAbsolutePath(), "test_table", 
collector);
+
+        List<SeaTunnelRow> rows = collector.getRows();
+        Assertions.assertEquals(1, rows.size(), "Should have 1 row in complete 
file mode");
+
+        SeaTunnelRow row = rows.get(0);
+        byte[] fileData = (byte[]) row.getField(0);
+        Assertions.assertEquals(2048, fileData.length, "Should read entire 
file content");
+        Assertions.assertEquals("test_binary_complete.bin", row.getField(1));
+        Assertions.assertEquals(0L, row.getField(2));
+    }
+
+    private File createTestFile(String fileName, int sizeInBytes) throws 
IOException {
+        File testFile = tempDir.resolve(fileName).toFile();
+
+        if (sizeInBytes > 0) {
+            try (FileOutputStream fos = new FileOutputStream(testFile)) {
+                // Create test data with a pattern for verification
+                byte[] pattern = "SEATUNNEL_TEST_DATA_".getBytes();
+                int written = 0;
+                while (written < sizeInBytes) {
+                    int toWrite = Math.min(pattern.length, sizeInBytes - 
written);
+                    fos.write(pattern, 0, toWrite);
+                    written += toWrite;
+                }
+            }
+        } else {
+            // Create empty file
+            testFile.createNewFile();
+        }
+
+        return testFile;
+    }
+
+    private Config createConfig(String filePath, Integer chunkSize, Boolean 
completeFileMode) {
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put("path", filePath); // Fixed: use "path" instead of 
"file_path"

Review Comment:
   [nitpick] Avoid hardcoding the config key string `"path"`; use 
`FileBaseSourceOptions.FILE_PATH.key()` to populate the configuration map so it 
stays in sync with the option definition.
   ```suggestion
           configMap.put(FileBaseSourceOptions.FILE_PATH.key(), filePath);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to