This is an automated email from the ASF dual-hosted git repository.
voonhous pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 163a15a3c899 refactor: Add Lombok Builder annotation to
HoodieLogFormat (#17785)
163a15a3c899 is described below
commit 163a15a3c899130745071396171fe451e6e62ab1
Author: voonhous <[email protected]>
AuthorDate: Wed Jun 3 20:06:12 2026 +0800
refactor: Add Lombok Builder annotation to HoodieLogFormat (#17785)
* refactor: Add Lombok Builder annotation to HoodieLogFormat
* Addressed comments
* Addressed comments
- Restore fixed DEFAULT_SIZE_THRESHOLD as the HDFS block size in
createNewFile, decoupling it from the user-configurable log rollover threshold
- Fix stale 'file len' comment -> 'file size' and drop redundant '= 0L'
initializer on Writer.fileSize
---
.../cli/commands/TestHoodieLogFileCommand.java | 34 +-
.../timeline/versioning/v1/TimelineArchiverV1.java | 11 +-
.../org/apache/hudi/io/HoodieAppendHandle.java | 4 +-
.../java/org/apache/hudi/io/HoodieWriteHandle.java | 13 +-
.../metadata/HoodieBackedTableMetadataWriter.java | 7 +-
.../table/action/rollback/RollbackHelperV1.java | 7 +-
.../utils/TestLegacyArchivedMetaEntryReader.java | 12 +-
.../hudi/testutils/HoodieWriteableTestTable.java | 12 +-
.../testutils/HoodieFlinkWriteableTestTable.java | 14 +-
.../hudi/common/table/log/HoodieLogFormat.java | 323 +++++++----------
.../common/table/log/HoodieLogFormatWriter.java | 114 +++---
.../common/functional/TestHoodieLogFormat.java | 402 ++++++++++++++-------
.../TestHoodieLogFormatAppendFailure.java | 22 +-
.../hudi/common/table/TestTableSchemaResolver.java | 10 +-
....java => TestHoodieLogFormatWriterBuilder.java} | 13 +-
.../table/timeline/TestArchivedTimelineV1.java | 20 +-
.../common/testutils/HoodieCommonTestHarness.java | 14 +-
.../testutils/reader/HoodieFileSliceTestUtils.java | 7 +-
.../hudi/hadoop/testutils/InputFormatTestUtil.java | 25 +-
...dieSparkMergeOnReadTableInsertUpdateDelete.java | 7 +-
.../procedure/TestShowTimelineTableProcedure.scala | 8 +-
.../apache/hudi/hive/testutils/HiveTestUtil.java | 18 +-
.../TestHoodieMetadataTableValidator.java | 7 +-
23 files changed, 610 insertions(+), 494 deletions(-)
diff --git
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
index b067a6ddafc8..1df0b58a7037 100644
---
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
+++
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
@@ -36,6 +36,7 @@ import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.HoodieLogFormatWriter;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
@@ -109,11 +110,14 @@ public class TestHoodieLogFileCommand extends
CLIFunctionalTestHarness {
Files.createDirectories(Paths.get(partitionPath));
storage = HoodieStorageUtils.getStorage(tablePath, storageConf());
- try (HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder()
- .onParentPath(new StoragePath(partitionPath))
+ try (HoodieLogFormat.Writer writer = HoodieLogFormatWriter.builder()
+ .withParentPath(new StoragePath(partitionPath))
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-
.withFileId("test-log-fileid1").withInstantTime("100").withStorage(storage)
- .withSizeThreshold(1).build()) {
+ .withLogFileId("test-log-fileid1")
+ .withInstantTime("100")
+ .withStorage(storage)
+ .withSizeThreshold(1L)
+ .build()) {
// write data to file
List<HoodieRecord> records = SchemaTestUtil.generateTestRecords(0,
100).stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList());
@@ -203,16 +207,14 @@ public class TestHoodieLogFileCommand extends
CLIFunctionalTestHarness {
partitionPath = tablePath + StoragePath.SEPARATOR +
HoodieTestCommitMetadataGenerator.DEFAULT_SECOND_PARTITION_PATH;
Files.createDirectories(Paths.get(partitionPath));
- HoodieLogFormat.Writer writer = null;
- try {
- // set little threshold to split file.
- writer =
- HoodieLogFormat.newWriterBuilder().onParentPath(new
StoragePath(partitionPath))
- .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-
.withFileId("test-log-fileid1").withInstantTime(INSTANT_TIME).withStorage(
- storage)
- .withSizeThreshold(500).build();
-
+ try (HoodieLogFormat.Writer writer = HoodieLogFormatWriter.builder()
+ .withParentPath(new StoragePath(partitionPath))
+ .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+ .withLogFileId("test-log-fileid1")
+ .withInstantTime(INSTANT_TIME)
+ .withStorage(storage)
+ .withSizeThreshold(500L) // set little threshold to split file.
+ .build()) {
SchemaTestUtil testUtil = new SchemaTestUtil();
List<HoodieRecord> records1 = testUtil.generateHoodieTestRecords(0,
100).stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList());
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
@@ -220,10 +222,6 @@ public class TestHoodieLogFileCommand extends
CLIFunctionalTestHarness {
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records1,
header, HoodieRecord.RECORD_KEY_METADATA_FIELD);
writer.appendBlock(dataBlock);
- } finally {
- if (writer != null) {
- writer.close();
- }
}
Object result = shell.evaluate(() -> "show logfile records
--logFilePathPattern "
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
index d518ac5525dd..bab78324d7de 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
@@ -31,8 +31,8 @@ import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
+import org.apache.hudi.common.table.log.HoodieLogFormatWriter;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
@@ -116,9 +116,12 @@ public class TimelineArchiverV1<T extends
HoodieAvroPayload, I, K, O> implements
private Writer openWriter(StoragePath archivePath) {
try {
if (this.writer == null) {
- return
HoodieLogFormat.newWriterBuilder().onParentPath(archivePath).withInstantTime("")
-
.withFileId(archiveFilePath.getName()).withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION)
- .withStorage(metaClient.getStorage()).build();
+ return HoodieLogFormatWriter.builder()
+ .withParentPath(archivePath).withInstantTime("")
+ .withLogFileId(archiveFilePath.getName())
+ .withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION)
+ .withStorage(metaClient.getStorage())
+ .build();
} else {
return this.writer;
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 5ea8ba460f87..3b825bbf1ade 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -39,7 +39,7 @@ import org.apache.hudi.common.schema.HoodieSchemaField;
import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.log.AppendResult;
-import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.apache.hudi.common.table.log.block.HoodieHFileDataBlock;
@@ -105,7 +105,7 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
// Incoming records to be written to logs.
protected Iterator<HoodieRecord<T>> recordItr;
// Writer to log into the file group's latest slice.
- protected Writer writer;
+ protected HoodieLogFormat.Writer writer;
protected final List<WriteStatus> statuses;
// Total number of records written during appending
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index b1d8e4305971..4eebea25a01e 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -37,6 +37,7 @@ import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.HoodieLogFormatWriter;
import org.apache.hudi.common.table.log.LogFileCreationCallback;
import org.apache.hudi.common.table.read.DeleteContext;
import org.apache.hudi.common.util.ConfigUtils;
@@ -289,9 +290,9 @@ public abstract class HoodieWriteHandle<T, I, K, O> extends
HoodieIOHandle<T, I,
protected HoodieLogFormat.Writer createLogWriter(String instantTime, String
fileSuffix, Option<FileSlice> fileSliceOpt) {
try {
if
(config.getWriteVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
- return HoodieLogFormat.newWriterBuilder()
-
.onParentPath(FSUtils.constructAbsolutePath(hoodieTable.getMetaClient().getBasePath(),
partitionPath))
- .withFileId(fileId)
+ return HoodieLogFormatWriter.builder()
+
.withParentPath(FSUtils.constructAbsolutePath(hoodieTable.getMetaClient().getBasePath(),
partitionPath))
+ .withLogFileId(fileId)
.withInstantTime(instantTime)
.withFileSize(0L)
.withSizeThreshold(config.getLogFileMaxSize())
@@ -306,9 +307,9 @@ public abstract class HoodieWriteHandle<T, I, K, O> extends
HoodieIOHandle<T, I,
Option<HoodieLogFile> latestLogFile = fileSliceOpt.isPresent()
? fileSliceOpt.get().getLatestLogFile()
: Option.empty();
- return HoodieLogFormat.newWriterBuilder()
-
.onParentPath(FSUtils.constructAbsolutePath(hoodieTable.getMetaClient().getBasePath(),
partitionPath))
- .withFileId(fileId)
+ return HoodieLogFormatWriter.builder()
+
.withParentPath(FSUtils.constructAbsolutePath(hoodieTable.getMetaClient().getBasePath(),
partitionPath))
+ .withLogFileId(fileId)
.withInstantTime(instantTime)
.withLogVersion(latestLogFile.map(HoodieLogFile::getLogVersion).orElse(HoodieLogFile.LOGFILE_BASE_VERSION))
.withFileSize(latestLogFile.map(HoodieLogFile::getFileSize).orElse(0L))
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index ce5a4eeb0a2f..6748c8918a46 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -60,6 +60,7 @@ import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.HoodieLogFormatWriter;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
import org.apache.hudi.common.table.read.HoodieFileGroupReader;
@@ -1197,9 +1198,9 @@ public abstract class HoodieBackedTableMetadataWriter<I,
O> implements HoodieTab
final HoodieDeleteBlock block = new
HoodieDeleteBlock(Collections.emptyList(), blockHeader);
- try (HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder()
-
.onParentPath(FSUtils.constructAbsolutePath(metadataWriteConfig.getBasePath(),
relativePartitionPath))
- .withFileId(fileGroupFileId)
+ try (HoodieLogFormat.Writer writer = HoodieLogFormatWriter.builder()
+
.withParentPath(FSUtils.constructAbsolutePath(metadataWriteConfig.getBasePath(),
relativePartitionPath))
+ .withLogFileId(fileGroupFileId)
.withInstantTime(instantTime)
.withLogVersion(HoodieLogFile.LOGFILE_BASE_VERSION)
.withFileSize(0L)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelperV1.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelperV1.java
index 79a28421751e..1b288a70e8ba 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelperV1.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackHelperV1.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.HoodieLogFormatWriter;
import org.apache.hudi.common.table.log.LogFileCreationCallback;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
@@ -311,9 +312,9 @@ public class RollbackHelperV1 extends RollbackHelper {
// Let's emit markers for rollback as well. markers are emitted
under rollback instant time.
WriteMarkers writeMarkers =
WriteMarkersFactory.get(config.getMarkersType(), table, instantTime);
- HoodieLogFormat.WriterBuilder writerBuilder =
HoodieLogFormat.newWriterBuilder()
-
.onParentPath(FSUtils.constructAbsolutePath(metaClient.getBasePath(),
partitionPath))
- .withFileId(fileId)
+ HoodieLogFormatWriter.HoodieLogFormatWriterBuilder writerBuilder =
HoodieLogFormatWriter.builder()
+
.withParentPath(FSUtils.constructAbsolutePath(metaClient.getBasePath(),
partitionPath))
+ .withLogFileId(fileId)
.withLogWriteToken(CommonClientUtils.generateWriteToken(taskContextSupplier))
.withInstantTime(tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)
? instantToRollback.requestedTime() :
rollbackRequest.getLatestBaseInstant()
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestLegacyArchivedMetaEntryReader.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestLegacyArchivedMetaEntryReader.java
index 5317583bbe9c..ac6e7df56038 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestLegacyArchivedMetaEntryReader.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestLegacyArchivedMetaEntryReader.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.HoodieLogFormatWriter;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.ActiveAction;
@@ -101,10 +102,13 @@ public class TestLegacyArchivedMetaEntryReader {
private HoodieLogFormat.Writer openWriter(HoodieTableMetaClient metaClient) {
try {
- return HoodieLogFormat.newWriterBuilder()
- .onParentPath(metaClient.getArchivePath())
-
.withFileId("commits").withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION)
- .withStorage(metaClient.getStorage()).withInstantTime("").build();
+ return HoodieLogFormatWriter.builder()
+ .withParentPath(metaClient.getArchivePath())
+ .withLogFileId("commits")
+ .withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION)
+ .withStorage(metaClient.getStorage())
+ .withInstantTime("")
+ .build();
} catch (IOException e) {
throw new HoodieException("Unable to initialize HoodieLogFormat writer",
e);
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
index ed9f1ef17645..78e158f78608 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieWriteableTestTable.java
@@ -33,6 +33,7 @@ import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.HoodieLogFormatWriter;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.testutils.FileCreateUtilsLegacy;
@@ -168,10 +169,13 @@ public class HoodieWriteableTestTable extends
HoodieMetadataTestTable {
}
private Pair<String, HoodieLogFile> appendRecordsToLogFile(String
partitionPath, String fileId, List<HoodieRecord> records) throws Exception {
- try (HoodieLogFormat.Writer logWriter = HoodieLogFormat.newWriterBuilder()
- .onParentPath(new StoragePath(basePath, partitionPath))
- .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(fileId)
- .withInstantTime(currentInstantTime).withStorage(storage).build()) {
+ try (HoodieLogFormat.Writer logWriter = HoodieLogFormatWriter.builder()
+ .withParentPath(new StoragePath(basePath, partitionPath))
+ .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+ .withLogFileId(fileId)
+ .withInstantTime(currentInstantTime)
+ .withStorage(storage)
+ .build()) {
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME,
currentInstantTime);
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
diff --git
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java
index 09e24fbbb0fe..8d4395a50d7d 100644
---
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java
+++
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/testutils/HoodieFlinkWriteableTestTable.java
@@ -32,6 +32,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.HoodieLogFormatWriter;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
import org.apache.hudi.common.util.collection.Pair;
@@ -136,10 +137,13 @@ public class HoodieFlinkWriteableTestTable extends
HoodieWriteableTestTable {
private Pair<String, HoodieLogFile>
appendRecordsToLogFile(List<HoodieRecord> groupedRecords) throws Exception {
String partitionPath = groupedRecords.get(0).getPartitionPath();
HoodieRecordLocation location = groupedRecords.get(0).getCurrentLocation();
- try (HoodieLogFormat.Writer logWriter = HoodieLogFormat.newWriterBuilder()
- .onParentPath(new StoragePath(basePath, partitionPath))
-
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(location.getFileId())
-
.withInstantTime(location.getInstantTime()).withStorage(storage).build()) {
+ try (HoodieLogFormat.Writer logWriter = HoodieLogFormatWriter.builder()
+ .withParentPath(new StoragePath(basePath, partitionPath))
+ .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+ .withLogFileId(location.getFileId())
+ .withInstantTime(location.getInstantTime())
+ .withStorage(storage)
+ .build()) {
Map<HeaderMetadataType, String> header = new java.util.HashMap<>();
header.put(HeaderMetadataType.INSTANT_TIME, location.getInstantTime());
header.put(HeaderMetadataType.SCHEMA, schema.toString());
@@ -150,7 +154,7 @@ public class HoodieFlinkWriteableTestTable extends
HoodieWriteableTestTable {
HoodieAvroUtils.addHoodieKeyToRecord(val, r.getRecordKey(),
r.getPartitionPath(), "");
return (IndexedRecord) val;
} catch (IOException e) {
- log.warn("Failed to convert record " + r.toString(), e);
+ log.warn("Failed to convert record {}", r, e);
return null;
}
}).map(HoodieAvroIndexedRecord::new).collect(Collectors.toList()),
header, HoodieRecord.RECORD_KEY_METADATA_FIELD));
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
index 2c536ec07722..a675e0d9da89 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
@@ -24,13 +24,13 @@ import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
import java.io.Closeable;
import java.io.IOException;
@@ -60,31 +60,140 @@ public interface HoodieLogFormat {
String DEFAULT_WRITE_TOKEN = "0-0-0";
- String DEFAULT_LOG_FORMAT_WRITER =
"org.apache.hudi.common.table.log.HoodieLogFormatWriter";
-
/**
- * Writer interface to allow appending block to this file format.
+ * Abstract base class for appending blocks to the Hoodie log format.
+ * Subclasses provide specific implementations for writing to different
storage layers.
*/
- interface Writer extends Closeable {
+ @Getter
+ @Slf4j
+ abstract class Writer implements Closeable {
+
+ // Default max log file size 512 MB
+ public static final long DEFAULT_SIZE_THRESHOLD = 512 * 1024 * 1024L;
+
+ // Buffer size
+ protected Integer bufferSize;
+ // FileSystem
+ protected HoodieStorage storage;
+ // Size threshold for the log file. Useful when used with a rolling log
appender
+ protected Long sizeThreshold;
+ // Log File extension. Could be .avro.delta or .avro.commits etc
+ protected String fileExtension;
+ // File Id
+ protected String logFileId;
+ // File Commit Time stamp
+ protected String instantTime;
+ // version number for this log file. If not specified, then the current
version will be
+ // computed by inspecting the file system
+ protected Integer logVersion;
+ // file size of this log file
+ protected Long fileSize;
+ // Location of the directory containing the log
+ protected StoragePath parentPath;
+ // Log File Write Token
+ protected String logWriteToken;
+ // optional file suffix
+ protected String suffix;
+ // file creation hook
+ protected LogFileCreationCallback fileCreationCallback;
+ protected HoodieLogFile logFile;
+
+ protected HoodieTableVersion tableVersion;
/**
- * @return the path to the current {@link HoodieLogFile} being written to.
+ * Base constructor that performs the core Hudi Log logic.
*/
- HoodieLogFile getLogFile();
+ protected Writer(
+ Integer bufferSize,
+ HoodieStorage storage,
+ StoragePath parentPath,
+ String logFileId,
+ String fileExtension,
+ String instantTime,
+ Integer logVersion,
+ String logWriteToken,
+ String suffix,
+ Long fileSize,
+ Long sizeThreshold,
+ LogFileCreationCallback fileCreationCallback,
+ HoodieTableVersion tableVersion) throws IOException {
+ log.info("Building HoodieLogFormat.Writer");
+
+ // Validation
+ ValidationUtils.checkArgument(storage != null, "Storage is not
specified");
+ ValidationUtils.checkArgument(logFileId != null, "FileID is not
specified");
+ ValidationUtils.checkArgument(instantTime != null, "Instant time is not
specified");
+ ValidationUtils.checkArgument(fileExtension != null, "File extension is
not specified");
+ ValidationUtils.checkArgument(parentPath != null, "Log file parent
location is not specified");
+
+ this.bufferSize = bufferSize != null ? bufferSize :
storage.getDefaultBufferSize();
+ this.storage = storage;
+ this.parentPath = parentPath;
+ this.logFileId = logFileId;
+ this.fileExtension = fileExtension;
+ this.instantTime = instantTime;
+ this.logVersion = logVersion;
+ this.logWriteToken = logWriteToken;
+ this.suffix = suffix;
+
+ // Defaults and logic
+ this.fileSize = fileSize != null ? fileSize : 0L;
+ this.sizeThreshold = sizeThreshold != null ? sizeThreshold :
DEFAULT_SIZE_THRESHOLD;
+ // Does nothing by default
+ this.fileCreationCallback = fileCreationCallback != null ?
fileCreationCallback : new LogFileCreationCallback() {};
+ this.tableVersion = tableVersion != null ? tableVersion :
HoodieTableVersion.current();
+
+ // Log version computation
+ if (this.logVersion == null) {
+ log.info("Computing next log version for {} in {}", logFileId,
parentPath);
+ boolean useBaseVersion =
this.tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT) &&
this.logWriteToken != null;
+
+ if (useBaseVersion) {
+ this.logVersion = HoodieLogFile.LOGFILE_BASE_VERSION;
+ } else {
+ // Compute from storage (expensive)
+ Option<Pair<Integer, String>> versionAndToken =
FSUtils.getLatestLogVersion(this.storage, this.parentPath, this.logFileId,
this.fileExtension, this.instantTime);
+ if (versionAndToken.isPresent()) {
+ this.logVersion = versionAndToken.get().getKey();
+ this.logWriteToken = versionAndToken.get().getValue();
+ } else {
+ this.logVersion = HoodieLogFile.LOGFILE_BASE_VERSION;
+ this.logWriteToken = UNKNOWN_WRITE_TOKEN;
+ }
+ }
+ }
+
+ if (this.logWriteToken == null) {
+ this.logWriteToken = UNKNOWN_WRITE_TOKEN;
+ }
+
+ if (this.suffix != null) {
+ // A little hacky to simplify the file name concatenation:
+ // patch the write token with an optional suffix
+ // instead of adding a new extension
+ this.logWriteToken = this.logWriteToken + this.suffix;
+ }
+
+ // Initialise logFile
+ StoragePath logPath = new StoragePath(parentPath,
+ FSUtils.makeLogFileName(this.logFileId, this.fileExtension,
this.instantTime, this.logVersion, this.logWriteToken));
+ log.info("HoodieLogFile on path {}", logPath);
+ this.logFile = new HoodieLogFile(logPath, this.fileSize);
+ }
/**
* Append Block to a log file.
* @return {@link AppendResult} containing result of the append.
*/
- AppendResult appendBlock(HoodieLogBlock block) throws IOException,
InterruptedException;
+ public abstract AppendResult appendBlock(HoodieLogBlock block) throws
IOException, InterruptedException;
/**
* Appends the list of blocks to a logfile.
* @return {@link AppendResult} containing result of the append.
*/
- AppendResult appendBlocks(List<HoodieLogBlock> blocks) throws IOException,
InterruptedException;
+ public abstract AppendResult appendBlocks(List<HoodieLogBlock> blocks)
throws IOException, InterruptedException;
- long getCurrentSize() throws IOException;
+ public abstract long getCurrentSize() throws IOException;
/**
* Force previously appended blocks to durable storage so that downstream
@@ -95,7 +204,7 @@ public interface HoodieLogFormat {
* mainly for tests that assert per-append visibility on the underlying
* file system.
*/
- void sync() throws IOException;
+ public abstract void sync() throws IOException;
}
/**
@@ -110,204 +219,20 @@ public interface HoodieLogFormat {
/**
* Read log file in reverse order and check if prev block is present.
- *
+ *
* @return {@code true} if previous block is present, {@code false}
otherwise.
*/
boolean hasPrev();
/**
* Read log file in reverse order and return prev block if present.
- *
+ *
* @return {@link HoodieLogBlock} the previous block
* @throws IOException
*/
HoodieLogBlock prev() throws IOException;
}
- /**
- * Builder class to construct the default log format writer.
- */
- class WriterBuilder {
-
- private static final Logger LOG =
LoggerFactory.getLogger(WriterBuilder.class);
- // Default max log file size 512 MB
- public static final long DEFAULT_SIZE_THRESHOLD = 512 * 1024 * 1024L;
-
- // Buffer size
- private Integer bufferSize;
- // FileSystem
- private HoodieStorage storage;
- // Size threshold for the log file. Useful when used with a rolling log
appender
- private Long sizeThreshold;
- // Log File extension. Could be .avro.delta or .avro.commits etc
- private String fileExtension;
- // File Id
- private String logFileId;
- // File Commit Time stamp
- private String instantTime;
- // version number for this log file. If not specified, then the current
version will be
- // computed by inspecting the file system
- private Integer logVersion;
- // file len of this log file
- private Long fileLen = 0L;
- // Location of the directory containing the log
- private StoragePath parentPath;
- // Log File Write Token
- private String logWriteToken;
- // optional file suffix
- private String suffix;
- // file creation hook
- private LogFileCreationCallback fileCreationCallback;
-
- private HoodieTableVersion tableVersion;
-
- public WriterBuilder withBufferSize(int bufferSize) {
- this.bufferSize = bufferSize;
- return this;
- }
-
- public WriterBuilder withLogWriteToken(String logWriteToken) {
- this.logWriteToken = logWriteToken;
- return this;
- }
-
- public WriterBuilder withSuffix(String suffix) {
- this.suffix = suffix;
- return this;
- }
-
- public WriterBuilder withStorage(HoodieStorage storage) {
- this.storage = storage;
- return this;
- }
-
- public WriterBuilder withSizeThreshold(long sizeThreshold) {
- this.sizeThreshold = sizeThreshold;
- return this;
- }
-
- public WriterBuilder withFileExtension(String logFileExtension) {
- this.fileExtension = logFileExtension;
- return this;
- }
-
- public WriterBuilder withFileId(String fileId) {
- this.logFileId = fileId;
- return this;
- }
-
- public WriterBuilder withInstantTime(String instantTime) {
- this.instantTime = instantTime;
- return this;
- }
-
- public WriterBuilder withLogVersion(int version) {
- this.logVersion = version;
- return this;
- }
-
- public WriterBuilder withFileSize(long fileLen) {
- this.fileLen = fileLen;
- return this;
- }
-
- public WriterBuilder onParentPath(StoragePath parentPath) {
- this.parentPath = parentPath;
- return this;
- }
-
- public WriterBuilder withFileCreationCallback(LogFileCreationCallback
fileCreationCallback) {
- this.fileCreationCallback = fileCreationCallback;
- return this;
- }
-
- public WriterBuilder withTableVersion(HoodieTableVersion
writeTableVersion) {
- this.tableVersion = writeTableVersion;
- return this;
- }
-
- public Writer build() throws IOException {
- LOG.info("Building HoodieLogFormat Writer");
- if (storage == null) {
- throw new IllegalArgumentException("fs is not specified");
- }
- if (logFileId == null) {
- throw new IllegalArgumentException("FileID is not specified");
- }
- if (instantTime == null) {
- throw new IllegalArgumentException("Instant time is not specified");
- }
- if (fileExtension == null) {
- throw new IllegalArgumentException("File extension is not specified");
- }
- if (parentPath == null) {
- throw new IllegalArgumentException("Log file parent location is not
specified");
- }
-
- if (fileCreationCallback == null) {
- // by default does nothing.
- fileCreationCallback = new LogFileCreationCallback() {};
- }
-
- if (tableVersion == null) {
- tableVersion = HoodieTableVersion.current();
- }
-
- if (logVersion == null) {
- LOG.info("Computing the next log version for {} in {}", logFileId,
parentPath);
- boolean useBaseVersion =
tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)
- && logWriteToken != null;
- if (useBaseVersion) {
- // the log format writer handles the existence check.
- logVersion = HoodieLogFile.LOGFILE_BASE_VERSION;
- } else {
- // compute from storage (expensive)
- Option<Pair<Integer, String>> versionAndWriteToken =
- FSUtils.getLatestLogVersion(storage, parentPath, logFileId,
fileExtension, instantTime);
- if (versionAndWriteToken.isPresent()) {
- logVersion = versionAndWriteToken.get().getKey();
- logWriteToken = versionAndWriteToken.get().getValue();
- } else {
- // this is the case where there is no existing log-file.
- logVersion = HoodieLogFile.LOGFILE_BASE_VERSION;
- logWriteToken = UNKNOWN_WRITE_TOKEN;
- }
- }
- LOG.info("Computed the next log version for {} in {} as {} with
write-token {}", logFileId, parentPath, logVersion, logWriteToken);
- }
-
- if (logWriteToken == null) {
- fileLen = 0L;
- logWriteToken = UNKNOWN_WRITE_TOKEN;
- }
-
- if (suffix != null) {
- // A little hacky to simplify the file name concatenation:
- // patch the write token with an optional suffix
- // instead of adding a new extension
- logWriteToken = logWriteToken + suffix;
- }
-
- StoragePath logPath = new StoragePath(parentPath,
- FSUtils.makeLogFileName(logFileId, fileExtension, instantTime,
logVersion, logWriteToken));
- LOG.info("HoodieLogFile on path {}", logPath);
- HoodieLogFile logFile = new HoodieLogFile(logPath, fileLen);
-
- if (sizeThreshold == null) {
- sizeThreshold = DEFAULT_SIZE_THRESHOLD;
- }
- return (Writer) ReflectionUtils.loadClass(
- DEFAULT_LOG_FORMAT_WRITER,
- new Class[] {HoodieStorage.class, HoodieLogFile.class,
Integer.class, Short.class, Long.class, String.class,
LogFileCreationCallback.class},
- storage, logFile, bufferSize, null, sizeThreshold, logWriteToken,
fileCreationCallback
- );
- }
- }
-
- static WriterBuilder newWriterBuilder() {
- return new WriterBuilder();
- }
-
static HoodieLogFormat.Reader newReader(HoodieStorage storage, HoodieLogFile
logFile, HoodieSchema readerSchema)
throws IOException {
return new HoodieLogFileReader(storage, logFile, readerSchema,
HoodieLogFileReader.DEFAULT_BUFFER_SIZE);
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
index 1d6acc8d85ef..2aed1d7dd87a 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
@@ -19,13 +19,14 @@
package org.apache.hudi.common.table.log;
-import org.apache.hudi.common.model.HoodieLogFile;
-import org.apache.hudi.common.table.log.HoodieLogFormat.WriterBuilder;
+import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+import lombok.Builder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -42,38 +43,36 @@ import java.util.List;
/**
* HoodieLogFormatWriter can be used to append blocks to a log file Use
HoodieLogFormat.WriterBuilder to construct.
*/
+@Getter
@Slf4j
-public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
+public class HoodieLogFormatWriter extends HoodieLogFormat.Writer {
- @Getter
- private HoodieLogFile logFile;
- private FSDataOutputStream output;
-
- private final HoodieStorage storage;
- @Getter
- private final long sizeThreshold;
- private final Integer bufferSize;
private final Short replication;
- private final String rolloverLogWriteToken;
- private final LogFileCreationCallback fileCreationHook;
+ private FSDataOutputStream outputStream;
private boolean closed = false;
private transient Thread shutdownThread = null;
- public HoodieLogFormatWriter(
- HoodieStorage storage,
- HoodieLogFile logFile,
+ @Builder(setterPrefix = "with")
+ private HoodieLogFormatWriter(
Integer bufferSize,
- Short replication,
+ HoodieStorage storage,
+ StoragePath parentPath,
+ String logFileId,
+ String fileExtension,
+ String instantTime,
+ Integer logVersion,
+ String logWriteToken,
+ String suffix,
+ Long fileSize,
Long sizeThreshold,
- String rolloverLogWriteToken,
- LogFileCreationCallback fileCreationHook) {
- this.storage = storage;
- this.logFile = logFile;
- this.sizeThreshold = sizeThreshold;
- this.bufferSize = bufferSize != null ? bufferSize :
storage.getDefaultBufferSize();
+ LogFileCreationCallback fileCreationCallback,
+ HoodieTableVersion tableVersion,
+ Short replication
+ ) throws IOException {
+ super(bufferSize, storage, parentPath, logFileId, fileExtension,
instantTime, logVersion, logWriteToken,
+ suffix, fileSize, sizeThreshold, fileCreationCallback, tableVersion);
+ // outputStream is not initialized here, it will be lazily initialized in
getOutputStream()
this.replication = replication != null ? replication :
storage.getDefaultReplication(logFile.getPath().getParent());
- this.rolloverLogWriteToken = rolloverLogWriteToken;
- this.fileCreationHook = fileCreationHook;
addShutDownHook();
}
@@ -81,8 +80,8 @@ public class HoodieLogFormatWriter implements
HoodieLogFormat.Writer {
* Overrides the output stream, only for test purpose.
*/
@VisibleForTesting
- public void withOutputStream(FSDataOutputStream output) {
- this.output = output;
+ public void withOutputStream(FSDataOutputStream outputStream) {
+ this.outputStream = outputStream;
}
/**
@@ -91,7 +90,7 @@ public class HoodieLogFormatWriter implements
HoodieLogFormat.Writer {
* @throws IOException
*/
private FSDataOutputStream getOutputStream() throws IOException {
- if (this.output == null) {
+ if (outputStream == null) {
boolean created = false;
while (!created) {
try {
@@ -116,7 +115,7 @@ public class HoodieLogFormatWriter implements
HoodieLogFormat.Writer {
}
}
}
- return output;
+ return outputStream;
}
@Override
@@ -207,23 +206,32 @@ public class HoodieLogFormatWriter implements
HoodieLogFormat.Writer {
private void rolloverIfNeeded() throws IOException {
// Roll over if the size is past the threshold
- if (getCurrentSize() > sizeThreshold) {
- log.info("CurrentSize {} has reached threshold {}. Rolling over to the
next version", getCurrentSize(), sizeThreshold);
+ if (getCurrentSize() > getSizeThreshold()) {
+ log.info("CurrentSize {} has reached threshold {}. Rolling over to the
next version", getCurrentSize(), getSizeThreshold());
rollOver();
}
}
private void rollOver() throws IOException {
closeStream();
- this.logFile = logFile.rollOver(rolloverLogWriteToken);
+ this.logFile = getLogFile().rollOver(getLogWriteToken());
this.closed = false;
}
private void createNewFile() throws IOException {
- fileCreationHook.preFileCreation(this.logFile);
- this.output = new FSDataOutputStream(
- storage.create(this.logFile.getPath(), false, bufferSize, replication,
WriterBuilder.DEFAULT_SIZE_THRESHOLD),
- new FileSystem.Statistics(storage.getScheme())
+ getFileCreationCallback().preFileCreation(this.getLogFile());
+ this.outputStream = new FSDataOutputStream(
+ getStorage().create(
+ this.getLogFile().getPath(),
+ false,
+ getBufferSize(),
+ getReplication(),
+ // HDFS block size is intentionally a fixed constant, independent
of the
+ // log rollover threshold (getSizeThreshold()). A small rollover
threshold
+ // must not shrink the underlying file's block size below HDFS
limits.
+ DEFAULT_SIZE_THRESHOLD
+ ),
+ new FileSystem.Statistics(getStorage().getScheme())
);
}
@@ -237,25 +245,25 @@ public class HoodieLogFormatWriter implements
HoodieLogFormat.Writer {
}
private void closeStream() throws IOException {
- if (output != null) {
+ if (outputStream != null) {
// Persist all buffered data to DataNodes before closing so downstream
// readers can observe a fully-written log file at commit-level
visibility.
sync();
- output.close();
- output = null;
+ outputStream.close();
+ outputStream = null;
closed = true;
}
}
@Override
public void sync() throws IOException {
- if (output == null) {
+ if (outputStream == null) {
return; // Presume closed
}
- output.flush();
+ outputStream.flush();
// NOTE: the following API call makes sure that the data is flushed to
disk on DataNodes (akin to POSIX fsync())
// See more details here: https://issues.apache.org/jira/browse/HDFS-744
- output.hsync();
+ outputStream.hsync();
}
@Override
@@ -264,27 +272,25 @@ public class HoodieLogFormatWriter implements
HoodieLogFormat.Writer {
throw new IllegalStateException("Cannot get current size as the
underlying stream has been closed already");
}
- if (output == null) {
+ if (outputStream == null) {
return 0;
}
- return output.getPos();
+ return outputStream.getPos();
}
/**
* Close the output stream when the JVM exits.
*/
private void addShutDownHook() {
- shutdownThread = new Thread() {
- public void run() {
- try {
- log.info("running HoodieLogFormatWriter shutdown hook to close
output stream for log file: {}", logFile);
- closeStream();
- } catch (Exception e) {
- log.warn("unable to close output stream for log file: {}", logFile,
e);
- // fail silently for any sort of exception
- }
+ shutdownThread = new Thread(() -> {
+ try {
+ log.info("Running HoodieLogFormatWriter shutdown hook to close output
stream for log file: {}", logFile);
+ closeStream();
+ } catch (Exception e) {
+ log.warn("Unable to close output stream for log file: {}", logFile, e);
+ // fail silently for any sort of exception
}
- };
+ });
Runtime.getRuntime().addShutdownHook(shutdownThread);
}
}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index 2a3b2ced35c8..e63a27149d3c 100755
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -44,7 +44,6 @@ import org.apache.hudi.common.table.log.AppendResult;
import org.apache.hudi.common.table.log.HoodieLogFileReader;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
-import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
import org.apache.hudi.common.table.log.HoodieLogFormatWriter;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.log.TestLogReaderUtils;
@@ -207,10 +206,14 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
@Test
public void testEmptyLog() throws IOException {
- Writer writer =
- HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+ HoodieLogFormat.Writer writer =
+ HoodieLogFormatWriter.builder()
+ .withParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+ .withLogFileId("test-fileid1")
+ .withInstantTime("100")
+ .withStorage(storage)
+ .build();
assertEquals(0, writer.getCurrentSize(), "Just created this log, size
should be 0");
assertTrue(writer.getLogFile().getFileName().startsWith("."), "Check all
log files should start with a .");
assertEquals(1, writer.getLogFile().getLogVersion(), "Version should be 1
for new log created");
@@ -220,15 +223,18 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
@ParameterizedTest
@EnumSource(names = {"AVRO_DATA_BLOCK", "HFILE_DATA_BLOCK",
"PARQUET_DATA_BLOCK"})
public void testBasicAppend(HoodieLogBlockType dataBlockType) throws
IOException, InterruptedException, URISyntaxException {
- Writer writer =
- HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+ HoodieLogFormat.Writer writer =
+ HoodieLogFormatWriter.builder()
+ .withParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+ .withLogFileId("test-fileid1")
+ .withInstantTime("100")
+ .withStorage(storage)
+ .build();
List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
Map<HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA,
getSimpleSchema().toString());
- long pos = writer.getCurrentSize();
HoodieDataBlock dataBlock = getDataBlock(dataBlockType, records, header);
AppendResult result = writer.appendBlock(dataBlock);
@@ -245,10 +251,14 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
@Test
public void testRollover() throws IOException, InterruptedException,
URISyntaxException {
- Writer writer =
- HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+ HoodieLogFormat.Writer writer =
+ HoodieLogFormatWriter.builder()
+ .withParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+ .withLogFileId("test-fileid1")
+ .withInstantTime("100")
+ .withStorage(storage)
+ .build();
List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
@@ -265,10 +275,14 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
// Create a writer with the size threshold as the size we just wrote - so
this has to roll
writer =
- HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+ HoodieLogFormatWriter.builder()
+ .withParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage)
- .withSizeThreshold(size - 1).build();
+ .withLogFileId("test-fileid1")
+ .withInstantTime("100")
+ .withStorage(storage)
+ .withSizeThreshold(size - 1)
+ .build();
records = SchemaTestUtil.generateTestRecords(0, 100);
dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records, header);
AppendResult secondAppend = writer.appendBlock(dataBlock);
@@ -306,14 +320,19 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
}
private void testConcurrentAppend(boolean logFileExists, boolean
newLogFileFormat) throws Exception {
- HoodieLogFormat.WriterBuilder builder1 =
- HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
-
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1")
- .withInstantTime("100").withStorage(storage);
- HoodieLogFormat.WriterBuilder builder2 =
- HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
-
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1")
- .withInstantTime("100").withStorage(storage);
+ HoodieLogFormatWriter.HoodieLogFormatWriterBuilder builder1 =
HoodieLogFormatWriter.builder()
+ .withParentPath(partitionPath)
+ .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+ .withLogFileId("test-fileid1")
+ .withInstantTime("100")
+ .withStorage(storage);
+
+ HoodieLogFormatWriter.HoodieLogFormatWriterBuilder builder2 =
HoodieLogFormatWriter.builder()
+ .withParentPath(partitionPath)
+ .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+ .withLogFileId("test-fileid1")
+ .withInstantTime("100")
+ .withStorage(storage);
if (newLogFileFormat && logFileExists) {
// Assume there is an existing log-file with write token
@@ -330,14 +349,14 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
} else {
builder1 =
builder1.withLogVersion(1).withLogWriteToken(HoodieLogFormat.UNKNOWN_WRITE_TOKEN);
}
- Writer writer = builder1.build();
+ HoodieLogFormat.Writer writer = builder1.build();
List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA,
getSimpleSchema().toString());
HoodieDataBlock dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records,
header);
writer.appendBlock(dataBlock);
- Writer writer2 = builder2.build();
+ HoodieLogFormat.Writer writer2 = builder2.build();
writer2.appendBlock(dataBlock);
HoodieLogFile logFile1 = writer.getLogFile();
HoodieLogFile logFile2 = writer2.getLogFile();
@@ -350,11 +369,15 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
@ParameterizedTest
@EnumSource(names = {"AVRO_DATA_BLOCK", "HFILE_DATA_BLOCK",
"PARQUET_DATA_BLOCK"})
public void testMultipleAppend(HoodieLogBlockType dataBlockType) throws
IOException, URISyntaxException, InterruptedException {
- Writer writer =
- HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+ HoodieLogFormat.Writer writer =
+ HoodieLogFormatWriter.builder()
+ .withParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-
.withFileId("test-fileid1").withLogVersion(1).withInstantTime("100")
- .withStorage(storage).build();
+ .withLogFileId("test-fileid1")
+ .withLogVersion(1)
+ .withInstantTime("100")
+ .withStorage(storage)
+ .build();
List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
@@ -365,10 +388,14 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
writer.close();
writer =
- HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+ HoodieLogFormatWriter.builder()
+ .withParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-
.withFileId("test-fileid1").withLogVersion(1).withInstantTime("100")
- .withStorage(storage).build();
+ .withLogFileId("test-fileid1")
+ .withLogVersion(1)
+ .withInstantTime("100")
+ .withStorage(storage)
+ .build();
((HoodieLogFormatWriter) writer).withOutputStream((FSDataOutputStream)
storage.append(writer.getLogFile().getPath()));
records = SchemaTestUtil.generateTestRecords(0, 100);
@@ -384,10 +411,14 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
// Close and Open again and append 100 more records
writer =
- HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+ HoodieLogFormatWriter.builder()
+ .withParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-
.withFileId("test-fileid1").withLogVersion(1).withInstantTime("100")
- .withStorage(storage).build();
+ .withLogFileId("test-fileid1")
+ .withLogVersion(1)
+ .withInstantTime("100")
+ .withStorage(storage)
+ .build();
((HoodieLogFormatWriter) writer).withOutputStream(
(FSDataOutputStream) storage.append(writer.getLogFile().getPath()));
records = SchemaTestUtil.generateTestRecords(0, 100);
@@ -402,7 +433,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
writer.close();
// Cannot get the current size after closing the log
- final Writer closedWriter = writer;
+ final HoodieLogFormat.Writer closedWriter = writer;
assertThrows(IllegalStateException.class, closedWriter::getCurrentSize,
"getCurrentSize should fail after the logAppender is closed");
}
@@ -424,8 +455,10 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
HoodieDataBlock dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records,
header);
for (int i = 0; i < 2; i++) {
- Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(testPath)
-
.withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION).withFileId("commits")
+ HoodieLogFormat.Writer writer = HoodieLogFormatWriter.builder()
+ .withParentPath(testPath)
+ .withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION)
+ .withLogFileId("commits")
.withInstantTime("")
.withStorage(localStorage).build();
writer.appendBlock(dataBlock);
@@ -440,11 +473,12 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
@ParameterizedTest
@ValueSource(ints = {6, 8})
public void testBasicWriteAndScan(int tableVersion) throws IOException,
URISyntaxException, InterruptedException {
- Writer writer =
- HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+ HoodieLogFormat.Writer writer =
+ HoodieLogFormatWriter.builder()
+ .withParentPath(partitionPath)
.withTableVersion(HoodieTableVersion.fromVersionCode(tableVersion))
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+
.withLogFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
HoodieSchema schema = getSimpleSchema();
List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
List<IndexedRecord> copyOfRecords = records.stream()
@@ -475,10 +509,12 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
@Test
public void testHugeLogFileWrite() throws IOException, URISyntaxException,
InterruptedException {
- Writer writer =
- HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+ HoodieLogFormat.Writer writer =
+ HoodieLogFormatWriter.builder().withParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage)
+ .withLogFileId("test-fileid1")
+ .withInstantTime("100")
+ .withStorage(storage)
.withSizeThreshold(3L * 1024 * 1024 * 1024)
.build();
HoodieSchema schema = getSimpleSchema();
@@ -525,10 +561,10 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
@ParameterizedTest
@EnumSource(names = {"AVRO_DATA_BLOCK", "HFILE_DATA_BLOCK",
"PARQUET_DATA_BLOCK"})
public void testBasicAppendAndRead(HoodieLogBlockType dataBlockType) throws
IOException, URISyntaxException, InterruptedException {
- Writer writer = HoodieLogFormat.newWriterBuilder()
- .onParentPath(partitionPath)
+ HoodieLogFormat.Writer writer = HoodieLogFormatWriter.builder()
+ .withParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
- .withFileId("test-fileid1")
+ .withLogFileId("test-fileid1")
.withInstantTime("100")
.withStorage(storage)
.build();
@@ -543,10 +579,10 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
writer.appendBlock(dataBlock);
writer.close();
- writer = HoodieLogFormat.newWriterBuilder()
- .onParentPath(partitionPath)
+ writer = HoodieLogFormatWriter.builder()
+ .withParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
- .withFileId("test-fileid1")
+ .withLogFileId("test-fileid1")
.withInstantTime("100")
.withStorage(storage)
.build();
@@ -562,10 +598,10 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
writer.close();
// Close and Open again and append 100 more records
- writer = HoodieLogFormat.newWriterBuilder()
- .onParentPath(partitionPath)
+ writer = HoodieLogFormatWriter.builder()
+ .withParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
- .withFileId("test-fileid1")
+ .withLogFileId("test-fileid1")
.withInstantTime("100")
.withStorage(storage)
.build();
@@ -614,10 +650,10 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
@Test
public void testCDCBlock() throws IOException, InterruptedException {
- Writer writer = HoodieLogFormat.newWriterBuilder()
- .onParentPath(partitionPath)
+ HoodieLogFormat.Writer writer = HoodieLogFormatWriter.builder()
+ .withParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
- .withFileId("test-fileid1")
+ .withLogFileId("test-fileid1")
.withInstantTime("100")
.withStorage(storage)
.build();
@@ -1062,10 +1098,13 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
}
private HoodieLogFile addValidBlock(String fileId, String commitTime, int
numRecords) throws IOException, URISyntaxException, InterruptedException {
- Writer writer =
- HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+ HoodieLogFormat.Writer writer =
+ HoodieLogFormatWriter.builder().withParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-
.withFileId(fileId).withInstantTime(commitTime).withStorage(storage).build();
+ .withLogFileId(fileId)
+ .withInstantTime(commitTime)
+ .withStorage(storage)
+ .build();
List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0,
numRecords);
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
@@ -1079,10 +1118,13 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
private HoodieLogFile appendValidBlock(StoragePath path, String fileId,
String commitTime,
int numRecords)
throws IOException, URISyntaxException, InterruptedException {
- Writer writer =
- HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+ HoodieLogFormat.Writer writer =
+ HoodieLogFormatWriter.builder().withParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-
.withFileId(fileId).withInstantTime(commitTime).withStorage(storage).build();
+ .withLogFileId(fileId)
+ .withInstantTime(commitTime)
+ .withStorage(storage)
+ .build();
((HoodieLogFormatWriter) writer).withOutputStream(
(FSDataOutputStream) storage.append(path));
List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0,
numRecords);
@@ -1097,10 +1139,13 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
@Test
public void testValidateCorruptBlockEndPosition() throws IOException,
URISyntaxException, InterruptedException {
- Writer writer =
- HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+ HoodieLogFormat.Writer writer =
+ HoodieLogFormatWriter.builder().withParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+ .withLogFileId("test-fileid1")
+ .withInstantTime("100")
+ .withStorage(storage)
+ .build();
List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
@@ -1153,11 +1198,14 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
throws IOException, URISyntaxException, InterruptedException {
HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
- Writer writer =
- HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+ HoodieLogFormat.Writer writer =
+ HoodieLogFormatWriter.builder().withParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage)
- .withSizeThreshold(500).build();
+ .withLogFileId("test-fileid1")
+ .withInstantTime("100")
+ .withStorage(storage)
+ .withSizeThreshold(500L)
+ .build();
SchemaTestUtil testUtil = new SchemaTestUtil();
// Write 1
@@ -1197,10 +1245,13 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
throws IOException, URISyntaxException, InterruptedException {
HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
- Writer writer =
- HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+ HoodieLogFormat.Writer writer =
+ HoodieLogFormatWriter.builder().withParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+ .withLogFileId("test-fileid1")
+ .withInstantTime("100")
+ .withStorage(storage)
+ .build();
// Write 1
SchemaTestUtil testUtil = new SchemaTestUtil();
@@ -1260,10 +1311,13 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
throws IOException, URISyntaxException, InterruptedException {
HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
- Writer writer =
- HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+ HoodieLogFormat.Writer writer =
+ HoodieLogFormatWriter.builder().withParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+ .withLogFileId("test-fileid1")
+ .withInstantTime("100")
+ .withStorage(storage)
+ .build();
// Write 1
SchemaTestUtil testUtil = new SchemaTestUtil();
@@ -1298,9 +1352,12 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
outputStream.close();
writer =
- HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+ HoodieLogFormatWriter.builder().withParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+ .withLogFileId("test-fileid1")
+ .withInstantTime("100")
+ .withStorage(storage)
+ .build();
// Write 3
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "103");
List<IndexedRecord> records3 = testUtil.generateHoodieTestRecords(0, 100);
@@ -1330,10 +1387,13 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
throws IOException, URISyntaxException, InterruptedException {
HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
- Writer writer =
- HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+ HoodieLogFormat.Writer writer =
+ HoodieLogFormatWriter.builder().withParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+ .withLogFileId("test-fileid1")
+ .withInstantTime("100")
+ .withStorage(storage)
+ .build();
// Write 1
SchemaTestUtil testUtil = new SchemaTestUtil();
@@ -1471,10 +1531,13 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
String fileId = "test-fileid111";
- Writer writer =
- HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+ HoodieLogFormat.Writer writer =
+ HoodieLogFormatWriter.builder().withParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-
.withFileId(fileId).withInstantTime("100").withStorage(storage).build();
+ .withLogFileId(fileId)
+ .withInstantTime("100")
+ .withStorage(storage)
+ .build();
// Write 1 -> 100 records are written
SchemaTestUtil testUtil = new SchemaTestUtil();
@@ -1579,10 +1642,13 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
throws IOException, URISyntaxException, InterruptedException {
HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
- Writer writer =
- HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+ HoodieLogFormat.Writer writer =
+ HoodieLogFormatWriter.builder().withParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+ .withLogFileId("test-fileid1")
+ .withInstantTime("100")
+ .withStorage(storage)
+ .build();
// Write 1
SchemaTestUtil testUtil = new SchemaTestUtil();
@@ -1712,10 +1778,13 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
// Write a Data block and Delete block with same InstantTime (written in
same batch)
HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
- Writer writer =
- HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+ HoodieLogFormat.Writer writer =
+ HoodieLogFormatWriter.builder().withParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+ .withLogFileId("test-fileid1")
+ .withInstantTime("100")
+ .withStorage(storage)
+ .build();
// Write 1
SchemaTestUtil testUtil = new SchemaTestUtil();
@@ -1781,10 +1850,13 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
// Write a Data block and Delete block with same InstantTime (written in
same batch)
HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
- Writer writer =
- HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+ HoodieLogFormat.Writer writer =
+ HoodieLogFormatWriter.builder().withParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+ .withLogFileId("test-fileid1")
+ .withInstantTime("100")
+ .withStorage(storage)
+ .build();
// Write 1
SchemaTestUtil testUtil = new SchemaTestUtil();
@@ -1833,10 +1905,13 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
throws IOException, URISyntaxException, InterruptedException {
HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
- Writer writer =
- HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+ HoodieLogFormat.Writer writer =
+ HoodieLogFormatWriter.builder().withParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+ .withLogFileId("test-fileid1")
+ .withInstantTime("100")
+ .withStorage(storage)
+ .build();
// Write 1
SchemaTestUtil testUtil = new SchemaTestUtil();
@@ -1870,10 +1945,13 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
// Write a 3 Data blocs with same InstantTime (written in same batch)
HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
- Writer writer =
- HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+ HoodieLogFormat.Writer writer =
+ HoodieLogFormatWriter.builder().withParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+ .withLogFileId("test-fileid1")
+ .withInstantTime("100")
+ .withStorage(storage)
+ .build();
// Write 1
SchemaTestUtil testUtil = new SchemaTestUtil();
@@ -1924,10 +2002,13 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
throws IOException, URISyntaxException, InterruptedException {
HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
- Writer writer =
- HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+ HoodieLogFormat.Writer writer =
+ HoodieLogFormatWriter.builder().withParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+ .withLogFileId("test-fileid1")
+ .withInstantTime("100")
+ .withStorage(storage)
+ .build();
List<String> deleteKeyListInV2Block = Arrays.asList(
"d448e1b8-a0d4-45c0-bf2d-a9e16ff3c8ce",
"df3f71cd-5b68-406c-bb70-861179444adb",
@@ -2047,10 +2128,13 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
throws IOException, URISyntaxException, InterruptedException {
HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
- Writer writer =
- HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+ HoodieLogFormat.Writer writer =
+ HoodieLogFormatWriter.builder().withParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+ .withLogFileId("test-fileid1")
+ .withInstantTime("100")
+ .withStorage(storage)
+ .build();
// Write 1
SchemaTestUtil testUtil = new SchemaTestUtil();
@@ -2107,10 +2191,13 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
// Write a 3 Data blocs with same InstantTime (written in same batch)
HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
- Writer writer =
- HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+ HoodieLogFormat.Writer writer =
+ HoodieLogFormatWriter.builder().withParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+ .withLogFileId("test-fileid1")
+ .withInstantTime("100")
+ .withStorage(storage)
+ .build();
// Write 1
SchemaTestUtil testUtil = new SchemaTestUtil();
@@ -2152,9 +2239,13 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
outputStream.close();
writer =
- HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+ HoodieLogFormatWriter.builder()
+ .withParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+ .withLogFileId("test-fileid1")
+ .withInstantTime("100")
+ .withStorage(storage)
+ .build();
writer.appendBlock(dataBlock);
writer.close();
@@ -2172,9 +2263,12 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
outputStream.close();
writer =
- HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+ HoodieLogFormatWriter.builder().withParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+ .withLogFileId("test-fileid1")
+ .withInstantTime("100")
+ .withStorage(storage)
+ .build();
// Write 1 rollback block for the last commit instant
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
header.put(HeaderMetadataType.TARGET_INSTANT_TIME, "100");
@@ -2202,10 +2296,13 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
// Write a 3 Data blocks with same InstantTime (written in same batch)
HoodieSchema schema =
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version
- Writer writer =
- HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+ HoodieLogFormat.Writer writer =
+ HoodieLogFormatWriter.builder().withParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+ .withLogFileId("test-fileid1")
+ .withInstantTime("100")
+ .withStorage(storage)
+ .build();
// Write 1st data blocks multiple times.
SchemaTestUtil testUtil = new SchemaTestUtil();
@@ -2275,9 +2372,12 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
outputStream.close();
writer =
- HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+ HoodieLogFormatWriter.builder().withParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+ .withLogFileId("test-fileid1")
+ .withInstantTime("100")
+ .withStorage(storage)
+ .build();
((HoodieLogFormatWriter) writer).withOutputStream(
(FSDataOutputStream) storage.append(writer.getLogFile().getPath()));
@@ -2405,9 +2505,13 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
List<IndexedRecord> records2 = new ArrayList<>(records);
// Write1 with numRecordsInLog1 records written to log.1
- Writer writer =
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
-
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1")
- .withInstantTime("100").withStorage(storage).build();
+ HoodieLogFormat.Writer writer = HoodieLogFormatWriter.builder()
+ .withParentPath(partitionPath)
+ .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+ .withLogFileId("test-fileid1")
+ .withInstantTime("100")
+ .withStorage(storage)
+ .build();
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
@@ -2419,9 +2523,14 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
writer.close();
// write2 with numRecordsInLog2 records written to log.2
- Writer writer2 =
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
-
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1")
- .withInstantTime("100").withStorage(storage).withSizeThreshold(size
- 1).build();
+ HoodieLogFormat.Writer writer2 = HoodieLogFormatWriter.builder()
+ .withParentPath(partitionPath)
+ .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+ .withLogFileId("test-fileid1")
+ .withInstantTime("100")
+ .withStorage(storage)
+ .withSizeThreshold(size - 1)
+ .build();
Map<HoodieLogBlock.HeaderMetadataType, String> header2 = new HashMap<>();
header2.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
@@ -2498,10 +2607,14 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
@Test
public void testBasicAppendAndReadInReverse()
throws IOException, URISyntaxException, InterruptedException {
- Writer writer =
- HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+ HoodieLogFormat.Writer writer =
+ HoodieLogFormatWriter.builder()
+ .withParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+ .withLogFileId("test-fileid1")
+ .withInstantTime("100")
+ .withStorage(storage)
+ .build();
HoodieSchema schema = getSimpleSchema();
List<IndexedRecord> records1 = SchemaTestUtil.generateTestRecords(0, 100);
List<IndexedRecord> copyOfRecords1 = records1.stream()
@@ -2568,10 +2681,13 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
@Test
public void testAppendAndReadOnCorruptedLogInReverse()
throws IOException, URISyntaxException, InterruptedException {
- Writer writer =
- HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+ HoodieLogFormat.Writer writer =
+ HoodieLogFormatWriter.builder().withParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+ .withLogFileId("test-fileid1")
+ .withInstantTime("100")
+ .withStorage(storage)
+ .build();
HoodieSchema schema = getSimpleSchema();
List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
@@ -2602,9 +2718,12 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
// Should be able to append a new block
writer =
- HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+ HoodieLogFormatWriter.builder().withParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+ .withLogFileId("test-fileid1")
+ .withInstantTime("100")
+ .withStorage(storage)
+ .build();
((HoodieLogFormatWriter) writer).withOutputStream(
(FSDataOutputStream) storage.append(writer.getLogFile().getPath()));
records = SchemaTestUtil.generateTestRecords(0, 100);
@@ -2630,10 +2749,13 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
@Test
public void testBasicAppendAndTraverseInReverse()
throws IOException, URISyntaxException, InterruptedException {
- Writer writer =
- HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+ HoodieLogFormat.Writer writer =
+ HoodieLogFormatWriter.builder().withParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+ .withLogFileId("test-fileid1")
+ .withInstantTime("100")
+ .withStorage(storage)
+ .build();
HoodieSchema schema = getSimpleSchema();
List<IndexedRecord> records1 = SchemaTestUtil.generateTestRecords(0, 100);
List<IndexedRecord> copyOfRecords1 = records1.stream()
@@ -2717,10 +2839,10 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
public void testDataBlockFormatAppendAndReadWithProjectedSchema(
HoodieLogBlockType dataBlockType
) throws IOException, URISyntaxException, InterruptedException {
- Writer writer = HoodieLogFormat.newWriterBuilder()
- .onParentPath(partitionPath)
+ HoodieLogFormat.Writer writer = HoodieLogFormatWriter.builder()
+ .withParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
- .withFileId("test-fileid1")
+ .withLogFileId("test-fileid1")
.withInstantTime("100")
.withStorage(storage)
.build();
@@ -2862,10 +2984,14 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
private HoodieLogFormat.Reader createCorruptedFile(String fileId) throws
Exception {
// block is corrupted, but check is skipped.
- Writer writer =
- HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+ HoodieLogFormat.Writer writer =
+ HoodieLogFormatWriter.builder()
+ .withParentPath(partitionPath)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-
.withFileId(fileId).withInstantTime("100").withStorage(storage).build();
+ .withLogFileId(fileId)
+ .withInstantTime("100")
+ .withStorage(storage)
+ .build();
List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java
index 7596c17d5209..125527541b65 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormatAppendFailure.java
@@ -21,8 +21,8 @@ package org.apache.hudi.common.functional;
import org.apache.hudi.common.model.HoodieArchivedLogFile;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
+import org.apache.hudi.common.table.log.HoodieLogFormatWriter;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
@@ -120,9 +120,13 @@ public class TestHoodieLogFormatAppendFailure {
HoodieAvroDataBlock dataBlock =
new HoodieAvroDataBlock(records, header,
HoodieRecord.RECORD_KEY_METADATA_FIELD);
- Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(testPath)
-
.withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION).withFileId("commits")
- .withInstantTime("").withStorage(storage).build();
+ Writer writer = HoodieLogFormatWriter.builder()
+ .withParentPath(testPath)
+ .withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION)
+ .withLogFileId("commits")
+ .withInstantTime("")
+ .withStorage(storage)
+ .build();
writer.appendBlock(dataBlock);
// get the current log file version to compare later
@@ -152,9 +156,13 @@ public class TestHoodieLogFormatAppendFailure {
// Opening a new Writer right now will throw IOException. The code should
handle this, rollover the logfile and
// return a new writer with a bumped up logVersion
- writer = HoodieLogFormat.newWriterBuilder().onParentPath(testPath)
-
.withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION).withFileId("commits")
- .withInstantTime("").withStorage(storage).build();
+ writer = HoodieLogFormatWriter.builder()
+ .withParentPath(testPath)
+ .withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION)
+ .withLogFileId("commits")
+ .withInstantTime("")
+ .withStorage(storage)
+ .build();
header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.COMMAND_BLOCK_TYPE,
String.valueOf(HoodieCommandBlock.HoodieCommandBlockTypeEnum.ROLLBACK_BLOCK.ordinal()));
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
index 193bb0173dfd..f2c6d2b49b20 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaField;
import org.apache.hudi.common.schema.HoodieSchemaType;
import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.HoodieLogFormatWriter;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -195,8 +196,13 @@ class TestTableSchemaResolver {
private StoragePath writeLogFile(StoragePath partitionPath, Schema schema)
throws IOException, URISyntaxException, InterruptedException {
HoodieStorage storage = HoodieTestUtils.getStorage(partitionPath);
HoodieLogFormat.Writer writer =
-
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
-
.withFileId("test-fileid1").withInstantTime("100").withStorage(storage).build();
+ HoodieLogFormatWriter.builder()
+ .withParentPath(partitionPath)
+ .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+ .withLogFileId("test-fileid1")
+ .withInstantTime("100")
+ .withStorage(storage)
+ .build();
List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieLogWriterBuilder.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieLogFormatWriterBuilder.java
similarity index 87%
rename from
hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieLogWriterBuilder.java
rename to
hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieLogFormatWriterBuilder.java
index e362a033cfe4..c7ed090fb85f 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieLogWriterBuilder.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/log/block/TestHoodieLogFormatWriterBuilder.java
@@ -23,6 +23,7 @@ package org.apache.hudi.common.table.log.block;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.HoodieLogFormatWriter;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
@@ -38,21 +39,21 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
/**
- * Test class for {@link HoodieLogFormat#newWriterBuilder()}.
+ * Test class for {@link HoodieLogFormatWriter#builder()}.
*/
-public class TestHoodieLogWriterBuilder {
+public class TestHoodieLogFormatWriterBuilder {
- HoodieLogFormat.WriterBuilder builder;
+ HoodieLogFormatWriter.HoodieLogFormatWriterBuilder builder;
HoodieLogFormat.Writer writer;
HoodieStorage storage;
@BeforeEach
public void setup() {
storage = mock(HoodieStorage.class);
- builder = HoodieLogFormat.newWriterBuilder()
- .withFileId("test-fileid1")
+ builder = HoodieLogFormatWriter.builder()
+ .withLogFileId("test-fileid1")
.withInstantTime("100")
- .onParentPath(new StoragePath("/tmp"))
+ .withParentPath(new StoragePath("/tmp"))
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
.withStorage(storage);
}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestArchivedTimelineV1.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestArchivedTimelineV1.java
index 3142338119a9..d576087bb3f1 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestArchivedTimelineV1.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestArchivedTimelineV1.java
@@ -40,6 +40,7 @@ import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
+import org.apache.hudi.common.table.log.HoodieLogFormatWriter;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import
org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV2MigrationHandler;
@@ -835,16 +836,23 @@ public class TestArchivedTimelineV1 extends
HoodieCommonTestHarness {
}
private Writer buildWriter(StoragePath archiveFilePath) throws IOException {
- return
HoodieLogFormat.newWriterBuilder().onParentPath(archiveFilePath.getParent())
-
.withFileId(archiveFilePath.getName()).withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION)
- .withStorage(metaClient.getStorage()).withInstantTime("").build();
+ return HoodieLogFormatWriter.builder()
+ .withParentPath(archiveFilePath.getParent())
+ .withLogFileId(archiveFilePath.getName())
+ .withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION)
+ .withStorage(metaClient.getStorage())
+ .withInstantTime("")
+ .build();
}
private Writer buildWriter(StoragePath archiveFilePath, int logVersion)
throws IOException {
- return
HoodieLogFormat.newWriterBuilder().onParentPath(archiveFilePath.getParent())
-
.withFileId(archiveFilePath.getName()).withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION)
+ return
HoodieLogFormatWriter.builder().withParentPath(archiveFilePath.getParent())
+ .withLogFileId(archiveFilePath.getName())
+ .withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION)
.withLogVersion(logVersion)
- .withStorage(metaClient.getStorage()).withInstantTime("").build();
+ .withStorage(metaClient.getStorage())
+ .withInstantTime("")
+ .build();
}
private void writeArchiveLog(Writer writer, List<IndexedRecord> records)
throws Exception {
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
index a3602b3ff9e8..ed6a6be4105f 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
@@ -358,12 +358,14 @@ public class HoodieCommonTestHarness {
String commitTime,
String
logBlockInstantTime)
throws IOException, InterruptedException {
- HoodieLogFormat.Writer writer =
- HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
- .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
- .withSizeThreshold(1024).withFileId(fileId)
- .withInstantTime(commitTime)
- .withStorage(storage).build();
+ HoodieLogFormat.Writer writer = HoodieLogFormatWriter.builder()
+ .withParentPath(partitionPath)
+ .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+ .withSizeThreshold(1024L)
+ .withLogFileId(fileId)
+ .withInstantTime(commitTime)
+ .withStorage(storage)
+ .build();
if (storage.exists(writer.getLogFile().getPath())) {
// enable append for reader test.
((HoodieLogFormatWriter) writer).withOutputStream(
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
similarity index 98%
rename from
hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
rename to
hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
index 71e3fc670681..d0f5bde6f41e 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileSliceTestUtils.java
@@ -36,6 +36,7 @@ import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.HoodieLogFormatWriter;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieCDCDataBlock;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
@@ -288,10 +289,10 @@ public class HoodieFileSliceTestUtils {
Map<String, Long> keyToPositionMap
) throws InterruptedException, IOException {
try (HoodieLogFormat.Writer writer =
- HoodieLogFormat.newWriterBuilder()
- .onParentPath(new StoragePath(logFilePath).getParent())
+ HoodieLogFormatWriter.builder()
+ .withParentPath(new StoragePath(logFilePath).getParent())
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
- .withFileId(fileId)
+ .withLogFileId(fileId)
.withInstantTime(logInstantTime)
.withLogVersion(version)
.withStorage(storage).build()) {
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
index 274b2e21ac2b..06f8113b5ee6 100644
---
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaField;
import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.HoodieLogFormatWriter;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
@@ -371,10 +372,14 @@ public class InputFormatTestUtil {
int logVersion)
throws InterruptedException, IOException {
HoodieLogFormat.Writer writer =
- HoodieLogFormat.newWriterBuilder().onParentPath(new
StoragePath(partitionDir.getPath()))
- .withFileId(fileId)
-
.withInstantTime(baseCommit).withStorage(storage).withLogVersion(logVersion)
- .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
+ HoodieLogFormatWriter.builder()
+ .withParentPath(new StoragePath(partitionDir.getPath()))
+ .withLogFileId(fileId)
+ .withInstantTime(baseCommit)
+ .withStorage(storage)
+ .withLogVersion(logVersion)
+ .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+ .build();
// generate metadata
Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, newCommit);
@@ -406,8 +411,10 @@ public class InputFormatTestUtil {
HoodieLogBlock.HoodieLogBlockType logBlockType)
throws InterruptedException, IOException {
HoodieLogFormat.Writer writer =
- HoodieLogFormat.newWriterBuilder().onParentPath(new
StoragePath(partitionDir.getPath()))
-
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(fileId)
+ HoodieLogFormatWriter.builder()
+ .withParentPath(new StoragePath(partitionDir.getPath()))
+ .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+ .withLogFileId(fileId)
.withLogVersion(logVersion)
.withInstantTime(newCommit)
.withStorage(storage)
@@ -444,8 +451,10 @@ public class InputFormatTestUtil {
String
oldCommit, int logVersion)
throws InterruptedException, IOException {
HoodieLogFormat.Writer writer =
- HoodieLogFormat.newWriterBuilder().onParentPath(new
StoragePath(partitionDir.getPath()))
-
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(fileId)
+ HoodieLogFormatWriter.builder()
+ .withParentPath(new StoragePath(partitionDir.getPath()))
+ .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+ .withLogFileId(fileId)
.withInstantTime(baseCommit)
.withLogVersion(logVersion).withStorage(storage).build();
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
index d1f21ddb3c4d..fcfbb2eb9832 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java
@@ -37,6 +37,7 @@ import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.AppendResult;
import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.HoodieLogFormatWriter;
import org.apache.hudi.common.table.log.LogFileCreationCallback;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
@@ -396,11 +397,11 @@ public class
TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClie
final WriteMarkers writeMarkers =
WriteMarkersFactory.get(config.getMarkersType(),
HoodieSparkTable.create(config, context()), newCommitTime);
- HoodieLogFormat.Writer fakeLogWriter = HoodieLogFormat.newWriterBuilder()
- .onParentPath(
+ HoodieLogFormat.Writer fakeLogWriter = HoodieLogFormatWriter.builder()
+ .withParentPath(
FSUtils.constructAbsolutePath(config.getBasePath(),
correctWriteStat.getPartitionPath()))
- .withFileId(correctWriteStat.getFileId())
+ .withLogFileId(correctWriteStat.getFileId())
.withInstantTime(newCommitTime)
.withLogVersion(correctLogFile.getLogVersion())
.withFileSize(0L)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowTimelineTableProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowTimelineTableProcedure.scala
index c1e84cca83f1..c21b0ec8713b 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowTimelineTableProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowTimelineTableProcedure.scala
@@ -25,7 +25,7 @@ import org.apache.hudi.common.engine.{HoodieEngineContext,
HoodieLocalEngineCont
import org.apache.hudi.common.engine.LocalTaskContextSupplier
import org.apache.hudi.common.model.{ActionType, HoodieArchivedLogFile,
HoodieAvroIndexedRecord, HoodieCommitMetadata, HoodieLogFile, HoodieRecord,
WriteOperationType}
import org.apache.hudi.common.table.{HoodieTableMetaClient, HoodieTableVersion}
-import org.apache.hudi.common.table.log.HoodieLogFormat
+import org.apache.hudi.common.table.log.{HoodieLogFormat,
HoodieLogFormatWriter}
import org.apache.hudi.common.table.log.block.{HoodieAvroDataBlock,
HoodieLogBlock}
import org.apache.hudi.common.table.timeline.{ActiveAction, HoodieInstant,
HoodieTimeline}
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion
@@ -398,9 +398,9 @@ class TestShowTimelineTableProcedure extends
HoodieSparkSqlTestBase {
storage.createDirectory(archivePath)
}
- val writer = HoodieLogFormat.newWriterBuilder()
- .onParentPath(archiveFilePath.getParent())
- .withFileId(archiveFilePath.getName())
+ val writer = HoodieLogFormatWriter.builder()
+ .withParentPath(archiveFilePath.getParent())
+ .withLogFileId(archiveFilePath.getName())
.withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION)
.withStorage(storage)
.withInstantTime("")
diff --git
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
index 24e76b2d9f69..0e603857f26d 100644
---
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
+++
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
@@ -39,8 +39,8 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
+import org.apache.hudi.common.table.log.HoodieLogFormatWriter;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType;
@@ -725,8 +725,10 @@ public class HiveTestUtil {
HoodieSchema schema = getTestDataSchema(isLogSchemaSimple);
HoodieBaseFile dataFile = new
HoodieBaseFile(storage.getPathInfo(parquetFilePath));
// Write a log file for this parquet file
- Writer logWriter =
HoodieLogFormat.newWriterBuilder().onParentPath(parquetFilePath.getParent())
-
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(dataFile.getFileId())
+ Writer logWriter = HoodieLogFormatWriter.builder()
+ .withParentPath(parquetFilePath.getParent())
+ .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+ .withLogFileId(dataFile.getFileId())
.withInstantTime(dataFile.getCommitTime()).withStorage(storage).build();
List<HoodieRecord> records = (isLogSchemaSimple ?
SchemaTestUtil.generateTestRecords(0, 100)
: SchemaTestUtil.generateEvolvedTestRecords(100, 100)).stream()
@@ -745,9 +747,13 @@ public class HiveTestUtil {
HoodieSchema schema = SchemaTestUtil.getSchema(logSchemaPath);
HoodieBaseFile dataFile = new
HoodieBaseFile(storage.getPathInfo(parquetFilePath));
// Write a log file for this parquet file
- Writer logWriter =
HoodieLogFormat.newWriterBuilder().onParentPath(parquetFilePath.getParent())
-
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(dataFile.getFileId())
-
.withInstantTime(dataFile.getCommitTime()).withStorage(storage).build();
+ Writer logWriter = HoodieLogFormatWriter.builder()
+ .withParentPath(parquetFilePath.getParent())
+ .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+ .withLogFileId(dataFile.getFileId())
+ .withInstantTime(dataFile.getCommitTime())
+ .withStorage(storage)
+ .build();
List<HoodieRecord> records =
SchemaTestUtil.generateTestRecords(logSchemaPath,
dataPath).stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList());
Map<HeaderMetadataType, String> header = new HashMap<>(2);
header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME,
dataFile.getCommitTime());
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
index b9fbe74b6d02..24ee11c8ba1e 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
@@ -38,6 +38,7 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.HoodieLogFormatWriter;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
@@ -927,10 +928,10 @@ public class TestHoodieMetadataTableValidator extends
HoodieSparkClientTestBase
String baseInstantTime,
String instantTime,
boolean writeDataBlock) throws
IOException, InterruptedException {
- try (HoodieLogFormat.Writer writer = HoodieLogFormat.newWriterBuilder()
- .onParentPath(new StoragePath(tempDir.toString()))
+ try (HoodieLogFormat.Writer writer = HoodieLogFormatWriter.builder()
+ .withParentPath(new StoragePath(tempDir.toString()))
.withFileExtension(HoodieLogFile.DELTA_EXTENSION)
- .withFileId(fileId)
+ .withLogFileId(fileId)
.withInstantTime(instantTime)
.withStorage(storage)
.withSizeThreshold(Long.MAX_VALUE).build()) {