Copilot commented on code in PR #9391:
URL: https://github.com/apache/seatunnel/pull/9391#discussion_r2123260473
##########
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java:
##########
@@ -43,11 +45,34 @@ public class BinaryReadStrategy extends
AbstractReadStrategy {
});
private File basePath;
+ private int binaryChunkSize =
FileBaseSourceOptions.BINARY_CHUNK_SIZE.defaultValue();
+ private boolean completeFileMode =
+ FileBaseSourceOptions.BINARY_COMPLETE_FILE_MODE.defaultValue();
@Override
public void init(HadoopConf conf) {
super.init(conf);
basePath = new
File(pluginConfig.getString(FileBaseSourceOptions.FILE_PATH.key()));
+
+ // Load binary chunk size configuration
+ if
(pluginConfig.hasPath(FileBaseSourceOptions.BINARY_CHUNK_SIZE.key())) {
+ binaryChunkSize =
pluginConfig.getInt(FileBaseSourceOptions.BINARY_CHUNK_SIZE.key());
+ // Validate chunk size - should be positive and reasonable
+ if (binaryChunkSize <= 0) {
+ throw new IllegalArgumentException(
+ "Binary chunk size must be positive, got: " +
binaryChunkSize);
+ }
+ if (binaryChunkSize > 100 * 1024 * 1024) { // 100MB limit
Review Comment:
[nitpick] Extract the `100 * 1024 * 1024` magic number into a descriptive
static final constant (e.g., `MAX_BINARY_CHUNK_SIZE`) to improve readability
and maintainability.
```suggestion
if (binaryChunkSize > MAX_BINARY_CHUNK_SIZE) { // 100MB limit
```
##########
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java:
##########
@@ -43,11 +45,34 @@ public class BinaryReadStrategy extends
AbstractReadStrategy {
});
private File basePath;
+ private int binaryChunkSize =
FileBaseSourceOptions.BINARY_CHUNK_SIZE.defaultValue();
+ private boolean completeFileMode =
+ FileBaseSourceOptions.BINARY_COMPLETE_FILE_MODE.defaultValue();
@Override
public void init(HadoopConf conf) {
super.init(conf);
basePath = new
File(pluginConfig.getString(FileBaseSourceOptions.FILE_PATH.key()));
+
+ // Load binary chunk size configuration
+ if
(pluginConfig.hasPath(FileBaseSourceOptions.BINARY_CHUNK_SIZE.key())) {
+ binaryChunkSize =
pluginConfig.getInt(FileBaseSourceOptions.BINARY_CHUNK_SIZE.key());
+ // Validate chunk size - should be positive and reasonable
+ if (binaryChunkSize <= 0) {
+ throw new IllegalArgumentException(
+ "Binary chunk size must be positive, got: " +
binaryChunkSize);
+ }
+ if (binaryChunkSize > 100 * 1024 * 1024) { // 100MB limit
+ throw new IllegalArgumentException(
Review Comment:
[nitpick] Consider using a connector-specific exception (e.g.,
`FileConnectorException`) instead of `IllegalArgumentException` to provide
consistent error reporting within the framework.
```suggestion
throw new FileConnectorException(
"FILE_CONNECTOR_ERROR",
"Binary chunk size must be positive, got: " +
binaryChunkSize);
}
if (binaryChunkSize > 100 * 1024 * 1024) { // 100MB limit
throw new FileConnectorException(
"FILE_CONNECTOR_ERROR",
```
##########
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/BinaryReadStrategy.java:
##########
@@ -66,21 +91,49 @@ public void read(String path, String tableId,
Collector<SeaTunnelRow> output)
relativePath =
relativePath.substring(File.separator.length());
}
}
- // TODO config this size
- int maxSize = 1024;
- byte[] buffer = new byte[maxSize];
- long partIndex = 0;
- int readSize;
- while ((readSize = inputStream.read(buffer)) != -1) {
- if (readSize != maxSize) {
- buffer = Arrays.copyOf(buffer, readSize);
- }
- SeaTunnelRow row = new SeaTunnelRow(new Object[] {buffer,
relativePath, partIndex});
- buffer = new byte[1024];
- row.setTableId(tableId);
- output.collect(row);
- partIndex++;
+
+ if (completeFileMode) {
+ // Read entire file as a single chunk
+ readCompleteFile(inputStream, relativePath, tableId, output);
+ } else {
+ // Read file in configurable chunks
+ readFileInChunks(inputStream, relativePath, tableId, output);
+ }
+ }
+ }
+
+ /** Read the entire file as a single chunk. */
+ private void readCompleteFile(
+ InputStream inputStream,
+ String relativePath,
+ String tableId,
+ Collector<SeaTunnelRow> output)
+ throws IOException {
+ byte[] fileContent = IOUtils.toByteArray(inputStream);
+ SeaTunnelRow row = new SeaTunnelRow(new Object[] {fileContent,
relativePath, 0L});
+ row.setTableId(tableId);
+ output.collect(row);
Review Comment:
Reading the entire file into memory via `IOUtils.toByteArray` can lead to
high memory usage for large files; consider imposing a size check or using a
streaming approach to avoid potential OOM issues.
```suggestion
try (ByteArrayOutputStream outputStream = new
ByteArrayOutputStream()) {
byte[] buffer = new byte[binaryChunkSize];
int bytesRead;
while ((bytesRead = inputStream.read(buffer)) != -1) {
outputStream.write(buffer, 0, bytesRead);
}
byte[] fileContent = outputStream.toByteArray();
SeaTunnelRow row = new SeaTunnelRow(new Object[] {fileContent,
relativePath, 0L});
row.setTableId(tableId);
output.collect(row);
}
```
##########
seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/reader/BinaryReadStrategyTest.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.reader;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.BinaryReadStrategy;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import lombok.Getter;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT;
+
+public class BinaryReadStrategyTest {
+
+ @TempDir Path tempDir;
+
+ private BinaryReadStrategy binaryReadStrategy;
+ private LocalConf localConf;
+
+ @BeforeEach
+ public void setUp() {
+ binaryReadStrategy = new BinaryReadStrategy();
+ localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ }
+
+ @Test
+ public void testBinaryReadWithDefaultChunkSize() throws IOException {
+ // Create a test file with 2048 bytes (2 chunks of 1024 bytes each)
+ File testFile = createTestFile("test_binary_default.bin", 2048);
+
+ Config config = createConfig(testFile.getParent(), null, null);
+ binaryReadStrategy.setPluginConfig(config);
+ binaryReadStrategy.init(localConf);
+
+ TestCollector collector = new TestCollector();
+ binaryReadStrategy.read(testFile.getAbsolutePath(), "test_table",
collector);
+
+ List<SeaTunnelRow> rows = collector.getRows();
+ Assertions.assertEquals(
+ 2, rows.size(), "Should have 2 chunks for 2048 bytes with
default 1024 chunk size");
+
+ // Verify first chunk
+ SeaTunnelRow firstRow = rows.get(0);
+ Assertions.assertEquals(3, firstRow.getArity());
+ byte[] firstChunkData = (byte[]) firstRow.getField(0);
+ Assertions.assertEquals(1024, firstChunkData.length);
+ Assertions.assertEquals("test_binary_default.bin",
firstRow.getField(1));
+ Assertions.assertEquals(0L, firstRow.getField(2));
+
+ // Verify second chunk
+ SeaTunnelRow secondRow = rows.get(1);
+ byte[] secondChunkData = (byte[]) secondRow.getField(0);
+ Assertions.assertEquals(1024, secondChunkData.length);
+ Assertions.assertEquals("test_binary_default.bin",
secondRow.getField(1));
+ Assertions.assertEquals(1L, secondRow.getField(2));
+ }
+
+ @Test
+ public void testBinaryReadWithCustomChunkSize() throws IOException {
+ // Create a test file with 1500 bytes
+ File testFile = createTestFile("test_binary_custom.bin", 1500);
+
+ Config config = createConfig(testFile.getParent(), 512, null);
+ binaryReadStrategy.setPluginConfig(config);
+ binaryReadStrategy.init(localConf);
+
+ TestCollector collector = new TestCollector();
+ binaryReadStrategy.read(testFile.getAbsolutePath(), "test_table",
collector);
+
+ List<SeaTunnelRow> rows = collector.getRows();
+ Assertions.assertEquals(
+ 3, rows.size(), "Should have 3 chunks for 1500 bytes with 512
chunk size");
+
+ // Verify chunk sizes: 512, 512, 476
+ Assertions.assertEquals(512, ((byte[])
rows.get(0).getField(0)).length);
+ Assertions.assertEquals(512, ((byte[])
rows.get(1).getField(0)).length);
+ Assertions.assertEquals(476, ((byte[])
rows.get(2).getField(0)).length);
+
+ // Verify part indices
+ Assertions.assertEquals(0L, rows.get(0).getField(2));
+ Assertions.assertEquals(1L, rows.get(1).getField(2));
+ Assertions.assertEquals(2L, rows.get(2).getField(2));
+ }
+
+ @Test
+ public void testBinaryReadCompleteFileMode() throws IOException {
+ // Create a test file with 2048 bytes
+ File testFile = createTestFile("test_binary_complete.bin", 2048);
+
+ Config config = createConfig(testFile.getParent(), null, true);
+ binaryReadStrategy.setPluginConfig(config);
+ binaryReadStrategy.init(localConf);
+
+ TestCollector collector = new TestCollector();
+ binaryReadStrategy.read(testFile.getAbsolutePath(), "test_table",
collector);
+
+ List<SeaTunnelRow> rows = collector.getRows();
+ Assertions.assertEquals(1, rows.size(), "Should have 1 row in complete
file mode");
+
+ SeaTunnelRow row = rows.get(0);
+ byte[] fileData = (byte[]) row.getField(0);
+ Assertions.assertEquals(2048, fileData.length, "Should read entire
file content");
+ Assertions.assertEquals("test_binary_complete.bin", row.getField(1));
+ Assertions.assertEquals(0L, row.getField(2));
+ }
+
+ private File createTestFile(String fileName, int sizeInBytes) throws
IOException {
+ File testFile = tempDir.resolve(fileName).toFile();
+
+ if (sizeInBytes > 0) {
+ try (FileOutputStream fos = new FileOutputStream(testFile)) {
+ // Create test data with a pattern for verification
+ byte[] pattern = "SEATUNNEL_TEST_DATA_".getBytes();
+ int written = 0;
+ while (written < sizeInBytes) {
+ int toWrite = Math.min(pattern.length, sizeInBytes -
written);
+ fos.write(pattern, 0, toWrite);
+ written += toWrite;
+ }
+ }
+ } else {
+ // Create empty file
+ testFile.createNewFile();
+ }
+
+ return testFile;
+ }
+
+ private Config createConfig(String filePath, Integer chunkSize, Boolean
completeFileMode) {
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put("path", filePath); // Fixed: use "path" instead of
"file_path"
Review Comment:
[nitpick] Avoid hardcoding the config key string `"path"`; use
`FileBaseSourceOptions.FILE_PATH.key()` to populate the configuration map so it
stays in sync with the option definition.
```suggestion
configMap.put(FileBaseSourceOptions.FILE_PATH.key(), filePath);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]