This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 5129773 [HUDI-2844][CLI] Fixing archived Timeline crashing if timeline contains REPLACE_COMMIT (#4091) 5129773 is described below commit 51297736ca7f72076376d4d245b3c161bdfe57b2 Author: Alexey Kudinkin <ale...@infinilake.com> AuthorDate: Wed Nov 24 16:53:29 2021 -0800 [HUDI-2844][CLI] Fixing archived Timeline crashing if timeline contains REPLACE_COMMIT (#4091) --- .../table/timeline/HoodieArchivedTimeline.java | 59 +++++++++++----------- 1 file changed, 30 insertions(+), 29 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java index 4926b2a..faff4ae 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java @@ -18,6 +18,10 @@ package org.apache.hudi.common.table.timeline; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieArchivedMetaEntry; import org.apache.hudi.common.model.HoodieLogFile; @@ -28,14 +32,10 @@ import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieIOException; - -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import javax.annotation.Nonnull; import java.io.IOException; import java.io.Serializable; import java.nio.charset.StandardCharsets; @@ -51,7 +51,6 @@ import java.util.function.Function; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; -import java.util.stream.Stream; /** * Represents the Archived Timeline for the Hoodie table. Instants for the last 12 hours (configurable) is in the @@ -147,7 +146,8 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline { final String instantTime = record.get(HoodiePartitionMetadata.COMMIT_TIME_KEY).toString(); final String action = record.get(ACTION_TYPE_KEY).toString(); if (loadDetails) { - Option.ofNullable(record.get(getMetadataKey(action))).map(actionData -> { + getMetadataKey(action).map(key -> { + Object actionData = record.get(key); if (action.equals(HoodieTimeline.COMPACTION_ACTION)) { this.readCommits.put(instantTime, HoodieAvroUtils.indexedRecordToBytes((IndexedRecord)actionData)); } else { @@ -159,22 +159,25 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline { return new HoodieInstant(HoodieInstant.State.valueOf(record.get(ACTION_STATE).toString()), action, instantTime); } - private String getMetadataKey(String action) { + @Nonnull + private Option<String> getMetadataKey(String action) { switch (action) { case HoodieTimeline.CLEAN_ACTION: - return "hoodieCleanMetadata"; + return Option.of("hoodieCleanMetadata"); case HoodieTimeline.COMMIT_ACTION: - return "hoodieCommitMetadata"; case HoodieTimeline.DELTA_COMMIT_ACTION: - return "hoodieCommitMetadata"; + return Option.of("hoodieCommitMetadata"); case HoodieTimeline.ROLLBACK_ACTION: - return "hoodieRollbackMetadata"; + return Option.of("hoodieRollbackMetadata"); case HoodieTimeline.SAVEPOINT_ACTION: - return "hoodieSavePointMetadata"; + return Option.of("hoodieSavePointMetadata"); case HoodieTimeline.COMPACTION_ACTION: - return "hoodieCompactionPlan"; + return Option.of("hoodieCompactionPlan"); + case HoodieTimeline.REPLACE_COMMIT_ACTION: + return Option.of("hoodieReplaceCommitMetadata"); default: - throw new HoodieIOException("Unknown action in metadata " + action); + LOG.error(String.format("Unknown action in metadata (%s)", action)); + return Option.empty(); } } @@ -199,35 +202,33 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline { private List<HoodieInstant> loadInstants(TimeRangeFilter filter, boolean loadInstantDetails, Function<GenericRecord, Boolean> commitsFilter) { try { - // list all files + // List all files FileStatus[] fsStatuses = metaClient.getFs().globStatus( new Path(metaClient.getArchivePath() + "/.commits_.archive*")); - // sort files by version suffix in reverse (implies reverse chronological order) + // Sort files by version suffix in reverse (implies reverse chronological order) Arrays.sort(fsStatuses, new ArchiveFileVersionComparator()); List<HoodieInstant> instantsInRange = new ArrayList<>(); for (FileStatus fs : fsStatuses) { - //read the archived file + // Read the archived file try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(metaClient.getFs(), new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema())) { int instantsInPreviousFile = instantsInRange.size(); - //read the avro blocks + // Read the avro blocks while (reader.hasNext()) { HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next(); // TODO If we can store additional metadata in datablock, we can skip parsing records // (such as startTime, endTime of records in the block) List<IndexedRecord> records = blk.getRecords(); - // filter blocks in desired time window - Stream<HoodieInstant> instantsInBlkStream = records.stream() - .filter(r -> commitsFilter.apply((GenericRecord) r)) - .map(r -> readCommit((GenericRecord) r, loadInstantDetails)); - - if (filter != null) { - instantsInBlkStream = instantsInBlkStream.filter(filter::isInRange); - } - - instantsInRange.addAll(instantsInBlkStream.collect(Collectors.toList())); + // Filter blocks in desired time window + instantsInRange.addAll( + records.stream() + .filter(r -> commitsFilter.apply((GenericRecord) r)) + .map(r -> readCommit((GenericRecord) r, loadInstantDetails)) + .filter(c -> filter == null || filter.isInRange(c)) + .collect(Collectors.toList()) + ); } if (filter != null) {