the-other-tim-brown commented on code in PR #13449:
URL: https://github.com/apache/hudi/pull/13449#discussion_r2152390612
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -546,12 +559,75 @@ public List<WriteStatus> close() {
status.getStat().setFileSizeInBytes(logFileSize);
}
+ // generate Secondary index stats if streaming is enabled.
+ if (!isSecondaryIndexStreamingDisabled()) {
+ // Adds secondary index only for the last log file write status. We do
not need to add secondary index stats
+ // for every log file written as part of the append handle write. The
last write status would update the
+ // secondary index considering all the log files.
+
trackMetadataIndexStatsForStreamingMetadataWrites(fileSliceOpt.or(this::getFileSlice),
statuses.stream().map(status ->
status.getStat().getPath()).collect(Collectors.toList()),
+ statuses.get(statuses.size() - 1));
+ }
+
return statuses;
} catch (IOException e) {
throw new HoodieUpsertException("Failed to close UpdateHandle", e);
}
}
+ private void
trackMetadataIndexStatsForStreamingMetadataWrites(Option<FileSlice>
fileSliceOpt, List<String> newLogFiles, WriteStatus status) {
+ // TODO: Optimise the computation for multiple secondary indexes
+ HoodieEngineContext engineContext = new
HoodieLocalEngineContext(hoodieTable.getStorageConf(), taskContextSupplier);
+ HoodieReaderContext readerContext =
engineContext.getReaderContextFactory(hoodieTable.getMetaClient()).getContext();
+
+ secondaryIndexDefns.forEach(secondaryIndexDefnPair -> {
+ // fetch primary key -> secondary index for prev file slice.
Review Comment:
Would it be more efficient to read this from the metadata table?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/IndexStats.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.model.HoodieRecordDelegate;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Class to hold all index stats required to generate Metadata records for all
enabled partitions.
+ * Supported stats are record level index stats and secondary index stats.
+ */
+public class IndexStats implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private final List<HoodieRecordDelegate> writtenRecordDelegates = new
ArrayList<>();
+ private final Map<String, List<SecondaryIndexStats>> secondaryIndexStats =
new HashMap<>();
+
+ void addHoodieRecordDelegate(HoodieRecordDelegate hoodieRecordDelegate) {
+ this.writtenRecordDelegates.add(hoodieRecordDelegate);
+ }
+
+ public List<HoodieRecordDelegate> getWrittenRecordDelegates() {
+ return writtenRecordDelegates;
+ }
+
+ public void instantiateSecondaryIndexStatsForIndex(String
secondaryIndexPartitionPath) {
+ secondaryIndexStats.put(secondaryIndexPartitionPath, new ArrayList<>());
+ }
+
+ public void addSecondaryIndexStats(String secondaryIndexPartitionPath,
String recordKey, String secondaryIndexValue, boolean isDeleted) {
+ secondaryIndexStats.computeIfAbsent(secondaryIndexPartitionPath, k -> new
ArrayList<>())
+ .add(new SecondaryIndexStats(recordKey, secondaryIndexValue,
isDeleted));
+ }
+
+ public Map<String, List<SecondaryIndexStats>> getSecondaryIndexStats() {
+ return secondaryIndexStats;
+ }
+
+ void clear() {
+ this.writtenRecordDelegates.clear();
+ this.secondaryIndexStats.clear();
+ }
Review Comment:
This doesn't seem necessary since the only usage is before dereferencing the
object itself
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -546,12 +559,75 @@ public List<WriteStatus> close() {
status.getStat().setFileSizeInBytes(logFileSize);
}
+ // generate Secondary index stats if streaming is enabled.
+ if (!isSecondaryIndexStreamingDisabled()) {
+ // Adds secondary index only for the last log file write status. We do
not need to add secondary index stats
+ // for every log file written as part of the append handle write. The
last write status would update the
+ // secondary index considering all the log files.
+
trackMetadataIndexStatsForStreamingMetadataWrites(fileSliceOpt.or(this::getFileSlice),
statuses.stream().map(status ->
status.getStat().getPath()).collect(Collectors.toList()),
+ statuses.get(statuses.size() - 1));
+ }
+
return statuses;
} catch (IOException e) {
throw new HoodieUpsertException("Failed to close UpdateHandle", e);
}
}
+ private void
trackMetadataIndexStatsForStreamingMetadataWrites(Option<FileSlice>
fileSliceOpt, List<String> newLogFiles, WriteStatus status) {
+ // TODO: Optimise the computation for multiple secondary indexes
+ HoodieEngineContext engineContext = new
HoodieLocalEngineContext(hoodieTable.getStorageConf(), taskContextSupplier);
+ HoodieReaderContext readerContext =
engineContext.getReaderContextFactory(hoodieTable.getMetaClient()).getContext();
+
+ secondaryIndexDefns.forEach(secondaryIndexDefnPair -> {
+ // fetch primary key -> secondary index for prev file slice.
+ Map<String, String> recordKeyToSecondaryKeyForPreviousFileSlice =
fileSliceOpt.map(fileSlice -> {
+ try {
+ return
SecondaryIndexRecordGenerationUtils.getRecordKeyToSecondaryKey(hoodieTable.getMetaClient(),
readerContext, fileSlice, writeSchemaWithMetaFields,
+ secondaryIndexDefnPair.getValue(), instantTime,
config.getProps(), false);
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to generate secondary index
stats ", e);
+ }
+ }).orElse(Collections.emptyMap());
+
+ // fetch primary key -> secondary index for latest file slice including
inflight.
Review Comment:
Do we need to read the full slice or just the new log files with
`emitDeletes` enabled so we can track the records being deleted?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -452,6 +461,58 @@ protected HoodieRecord<T> updateFileName(HoodieRecord<T>
record, Schema schema,
return record.prependMetaFields(schema, targetSchema, metadataValues,
prop);
}
+ private void trackMetadataIndexStats(Option<HoodieKey> hoodieKeyOpt,
Option<HoodieRecord> combinedRecordOpt, Option<HoodieRecord<T>> oldRecordOpt,
boolean isDelete) {
Review Comment:
The `hoodieKeyOpt` is always present in the callers, can the method simply
take in a `HoodieKey`
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieSparkFileGroupReaderBasedMergeHandle.java:
##########
@@ -136,4 +136,9 @@ public void write() {
throw new HoodieUpsertException("Failed to compact file slice: " +
fileSlice, e);
}
}
+
+ @Override
+ boolean isSecondaryIndexStreamingDisabled() {
+ return true;
Review Comment:
We will be defaulting the using this class, why is this hardcoded to skip
secondary index streaming
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -226,6 +231,13 @@ private Option<FileSlice>
populateWriteStatAndFetchFileSlice(HoodieRecord record
return fileSlice;
}
+ private Option<FileSlice> getFileSlice() {
+ Option<FileSlice> fileSlice;
+ TableFileSystemView.SliceView rtView = hoodieTable.getSliceView();
+ fileSlice = rtView.getLatestFileSlice(partitionPath, fileId);
+ return fileSlice;
Review Comment:
nitpick: just return directly instead of assigning to local variable
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java:
##########
@@ -214,22 +214,11 @@ public void testRemoveMetadataStats() {
stats.put("field1", HoodieColumnRangeMetadata.<Comparable>create("f1",
"field1", 1, 2, 0, 2, 5, 10));
status.setStat(new HoodieWriteStat());
status.getStat().putRecordsStats(stats);
- assertEquals(1, status.getWrittenRecordDelegates().size());
+ assertEquals(1, status.getIndexStats().getWrittenRecordDelegates().size());
assertEquals(1, status.getStat().getColumnStats().get().size());
// Remove metadata stats
status.removeMetadataStats();
- assertEquals(0, status.getWrittenRecordDelegates().size());
- }
-
- @Test
- public void testDropErrorRecords() {
Review Comment:
Is this removed intenationally?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java:
##########
@@ -189,21 +188,12 @@ private void updateStatsForFailure() {
totalErrorRecords++;
}
- public WriteStatus removeMetadataIndexStatsAndErrorRecordsTracking() {
- removeMetadataStats();
- dropGranularErrorRecordsTracking();
- return this;
- }
-
public WriteStatus removeMetadataStats() {
- this.writtenRecordDelegates.clear();
+ this.indexStats.clear();
+ this.indexStats = null;
return this;
}
- public void dropGranularErrorRecordsTracking() {
- failedRecords.clear();
- }
Review Comment:
Is the removal of this and its usage intentional?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java:
##########
@@ -173,6 +186,23 @@ record = record.prependMetaFields(schema,
writeSchemaWithMetaFields, new Metadat
}
}
+ private void trackMetadataIndexStats(HoodieRecord record) {
+ if (isSecondaryIndexStreamingDisabled()) {
+ return;
+ }
+
+ // Add secondary index records for all the inserted records
+ secondaryIndexDefns.forEach(secondaryIndexPartitionPathFieldPair -> {
+ String secondaryIndexSourceField =
String.join(".",secondaryIndexPartitionPathFieldPair.getValue().getSourceFields());
+ if (record instanceof HoodieAvroIndexedRecord) {
Review Comment:
Can we make this more generic by using the ReaderContext?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -546,12 +559,75 @@ public List<WriteStatus> close() {
status.getStat().setFileSizeInBytes(logFileSize);
}
+ // generate Secondary index stats if streaming is enabled.
+ if (!isSecondaryIndexStreamingDisabled()) {
+ // Adds secondary index only for the last log file write status. We do
not need to add secondary index stats
+ // for every log file written as part of the append handle write. The
last write status would update the
+ // secondary index considering all the log files.
+
trackMetadataIndexStatsForStreamingMetadataWrites(fileSliceOpt.or(this::getFileSlice),
statuses.stream().map(status ->
status.getStat().getPath()).collect(Collectors.toList()),
Review Comment:
Just double checking my understanding here, we can assume all the statuses
are for log files because this is an append handle?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -452,6 +461,58 @@ protected HoodieRecord<T> updateFileName(HoodieRecord<T>
record, Schema schema,
return record.prependMetaFields(schema, targetSchema, metadataValues,
prop);
}
+ private void trackMetadataIndexStats(Option<HoodieKey> hoodieKeyOpt,
Option<HoodieRecord> combinedRecordOpt, Option<HoodieRecord<T>> oldRecordOpt,
boolean isDelete) {
+ if (isSecondaryIndexStreamingDisabled()) {
+ return;
+ }
+ HoodieEngineContext engineContext = new
HoodieLocalEngineContext(hoodieTable.getStorageConf(), taskContextSupplier);
+ HoodieReaderContext readerContext =
engineContext.getReaderContextFactory(hoodieTable.getMetaClient()).getContext();
+
+ secondaryIndexDefns.forEach(secondaryIndexPartitionPathFieldPair -> {
+ String secondaryIndexSourceField =
String.join(".",secondaryIndexPartitionPathFieldPair.getValue().getSourceFields());
Review Comment:
nitpick: add a space after `".",`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]