nsivabalan commented on code in PR #13444:
URL: https://github.com/apache/hudi/pull/13444#discussion_r2185713172
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java:
##########
@@ -109,18 +112,23 @@ public HoodieFileGroupReader(HoodieReaderContext<T>
readerContext, HoodieStorage
long start, long length, boolean shouldUseRecordPosition) {
this(readerContext, storage, tablePath, latestCommitTime, fileSlice,
dataSchema,
requestedSchema, internalSchemaOpt, hoodieTableMetaClient, props,
start, length,
- shouldUseRecordPosition, false, false, false);
+ shouldUseRecordPosition, false, false, false, Option.empty());
}
private HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
HoodieStorage storage, String tablePath,
String latestCommitTime, FileSlice fileSlice,
Schema dataSchema, Schema requestedSchema,
Option<InternalSchema> internalSchemaOpt,
HoodieTableMetaClient hoodieTableMetaClient, TypedProperties props,
- long start, long length, boolean
shouldUseRecordPosition, boolean allowInflightInstants, boolean emitDelete,
boolean sortOutput) {
+ long start, long length, boolean
shouldUseRecordPosition, boolean allowInflightInstants, boolean emitDelete,
boolean sortOutput,
+ Option<FileGroupUpdateCallback<T>>
updateCallback) {
this.readerContext = readerContext;
+ this.fileGroupUpdateCallback = updateCallback;
this.metaClient = hoodieTableMetaClient;
this.storage = storage;
this.hoodieBaseFileOption = fileSlice.getBaseFile();
- this.logFiles =
fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
+ this.logFiles = fileSlice.getLogFiles()
Review Comment:
don't really need to fix in this patch.
but can we add a different api to `FileSlice` only to expose
`getCDClogFiles()` and so `getLogFiles()` can only return non cdc files to call
callers.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -565,27 +570,62 @@ protected boolean hasNextBaseRecord(T baseRecord,
BufferedRecord<T> logRecordInf
Pair<Boolean, T> isDeleteAndRecord = merge(baseRecordInfo,
logRecordInfo);
if (!isDeleteAndRecord.getLeft()) {
// Updates
- nextRecord = readerContext.seal(isDeleteAndRecord.getRight());
+ nextRecord =
readerContext.seal(applyOutputSchemaConversion(isDeleteAndRecord.getRight()));
+ // If the record is not the same as the base record, we can emit an
update
+ if (isDeleteAndRecord.getRight() != baseRecord) {
+ callbackOption.ifPresent(callback -> {
+ BufferedRecord<T> mergeResult =
BufferedRecord.forRecordWithContext(nextRecord,
readerContext.getSchemaHandler().getRequestedSchema(), readerContext,
orderingFieldName, false);
+
callback.onUpdate(readerContext.constructHoodieRecord(applyOutputSchemaConversion(baseRecordInfo)),
readerContext.constructHoodieRecord(applyOutputSchemaConversion(logRecordInfo)),
+
readerContext.constructHoodieRecord(applyOutputSchemaConversion(mergeResult)));
+ });
+ }
readStats.incrementNumUpdates();
return true;
- } else if (emitDelete) {
- // emit Deletes
- nextRecord = readerContext.getDeleteRow(isDeleteAndRecord.getRight(),
baseRecordInfo.getRecordKey());
- readStats.incrementNumDeletes();
- return nextRecord != null;
} else {
- // not emit Deletes
+ // emit Deletes
+ callbackOption.ifPresent(callback ->
callback.onDelete(readerContext.constructHoodieRecord(applyOutputSchemaConversion(baseRecordInfo)),
+
readerContext.constructHoodieRecord(applyOutputSchemaConversion(logRecordInfo))));
readStats.incrementNumDeletes();
- return false;
+ if (emitDelete) {
+ nextRecord =
applyOutputSchemaConversion(readerContext.getDeleteRow(isDeleteAndRecord.getRight(),
baseRecordInfo.getRecordKey()));
Review Comment:
prior to this patch, can you help me understand where are we applying the
schema conversion?
looks like w/ this patch, even if callbackOption is empty, here we are doing
something extra which was not happening before.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -565,27 +570,62 @@ protected boolean hasNextBaseRecord(T baseRecord,
BufferedRecord<T> logRecordInf
Pair<Boolean, T> isDeleteAndRecord = merge(baseRecordInfo,
logRecordInfo);
if (!isDeleteAndRecord.getLeft()) {
// Updates
- nextRecord = readerContext.seal(isDeleteAndRecord.getRight());
+ nextRecord =
readerContext.seal(applyOutputSchemaConversion(isDeleteAndRecord.getRight()));
+ // If the record is not the same as the base record, we can emit an
update
+ if (isDeleteAndRecord.getRight() != baseRecord) {
+ callbackOption.ifPresent(callback -> {
+ BufferedRecord<T> mergeResult =
BufferedRecord.forRecordWithContext(nextRecord,
readerContext.getSchemaHandler().getRequestedSchema(), readerContext,
orderingFieldName, false);
Review Comment:
orthogonal point.
sometimes, it might be confusing for devs on what is actually stored in
BufferedRecord as record type. whether its HoodieRecord or is it the raw record
representation. for eg, InternalRow incase of spark, GenericRecord incase of
avro etc.
bcoz, many places we have named methods as `getRecord()` which might lead to
confusion.
not sure if we can give it any better naming.
just a thought
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala:
##########
@@ -402,10 +375,8 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
&& currentCDCFileSplit.getBeforeFileSlice.isPresent)
loadBeforeFileSliceIfNeeded(currentCDCFileSplit.getBeforeFileSlice.get)
val absLogPath = new StoragePath(basePath,
currentCDCFileSplit.getCdcFiles.get(0))
- val morSplit = HoodieMergeOnReadFileSplit(None, List(new
HoodieLogFile(storage.getPathInfo(absLogPath))))
- val logFileIterator = new LogFileIterator(
Review Comment:
this existing LogFileIterator which is in Iterators.scala, accounts for
isDeleteOperation.
can you remind me if FileGroupReader/FileGroupRecordBuffer accounts for it
as well. I did not see any explicit checks in
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -598,11 +638,13 @@ protected boolean hasNextLogRecord() {
while (logRecordIterator.hasNext()) {
BufferedRecord<T> nextRecordInfo = logRecordIterator.next();
if (!nextRecordInfo.isDelete()) {
- nextRecord = nextRecordInfo.getRecord();
+ BufferedRecord<T> convertedBufferedRecord =
applyOutputSchemaConversion(nextRecordInfo);
+ nextRecord = convertedBufferedRecord.getRecord();
+ callbackOption.ifPresent(callback ->
callback.onInsert(readerContext.constructHoodieRecord(convertedBufferedRecord)));
readStats.incrementNumInserts();
return true;
} else if (emitDelete) {
- nextRecord = readerContext.getDeleteRow(nextRecordInfo.getRecord(),
nextRecordInfo.getRecordKey());
+ nextRecord =
applyOutputSchemaConversion(readerContext.getDeleteRow(nextRecordInfo.getRecord(),
nextRecordInfo.getRecordKey()));
Review Comment:
why no callback invoked here?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala:
##########
@@ -482,47 +449,74 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
if (!same) {
// clear up the beforeImageRecords
beforeImageRecords.clear()
- val iter = loadFileSlice(fileSlice)
- iter.foreach { row =>
- val key = getRecordKey(row)
- // Due to the reuse buffer mechanism of Spark serialization,
- // we have to copy the serialized result if we need to retain its
reference
- beforeImageRecords.put(key, serialize(row, copy = true))
+ val iter = loadFileSliceWithKeys(fileSlice)
+ iter.foreach { tuple =>
+ beforeImageRecords.put(tuple._1, tuple._2)
}
// reset beforeImageFiles
beforeImageFiles.clear()
beforeImageFiles.append(files: _*)
}
}
+ private def loadFileSliceWithKeys(fileSlice: FileSlice): Iterator[(String,
InternalRow)] = {
+ val readerContext = new
SparkFileFormatInternalRowReaderContext(parquetReader, Seq.empty, Seq.empty,
+ conf, metaClient.getTableConfig)
+ loadFileSlice(fileSlice, readerContext).map(internalRow => {
+ val recordKey = readerContext.getRecordKey(internalRow, avroSchema)
+ (recordKey, internalRow)
+ })
+ }
+
private def loadFileSlice(fileSlice: FileSlice): Iterator[InternalRow] = {
- val baseFileInfo =
storage.getPathInfo(fileSlice.getBaseFile.get().getStoragePath)
- val basePartitionedFile = sparkPartitionedFileUtils.createPartitionedFile(
- InternalRow.empty,
- baseFileInfo.getPath,
- 0,
- baseFileInfo.getLength
- )
- val logFiles = fileSlice.getLogFiles
- .sorted(HoodieLogFile.getLogFileComparator)
- .collect(Collectors.toList[HoodieLogFile])
- .asScala.toList
- .filterNot(_.getFileName.endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX))
-
- if (logFiles.isEmpty) {
- // no log files, just load the base parquet file
- parquetReader(basePartitionedFile)
- } else {
- // use [[RecordMergingFileIterator]] to load both the base file and log
files
- val morSplit = HoodieMergeOnReadFileSplit(Some(basePartitionedFile),
logFiles)
- new RecordMergingFileIterator(
- morSplit,
- BaseFileReader(parquetReader, originTableSchema.structTypeSchema),
- originTableSchema,
- originTableSchema,
- tableState,
- conf.unwrapAs(classOf[Configuration]))
- }
+ val readerContext = new
SparkFileFormatInternalRowReaderContext(parquetReader, Seq.empty, Seq.empty,
+ conf, metaClient.getTableConfig)
+ loadFileSlice(fileSlice, readerContext)
+ }
+
+ private def loadFileSlice(fileSlice: FileSlice, readerContext:
SparkFileFormatInternalRowReaderContext): Iterator[InternalRow] = {
+ val fileGroupReader = HoodieFileGroupReader.newBuilder()
+ .withReaderContext(readerContext)
+ .withHoodieTableMetaClient(metaClient)
+ .withFileSlice(fileSlice)
+ .withDataSchema(avroSchema)
+ .withRequestedSchema(avroSchema)
+ .withInternalSchema(toJavaOption(originTableSchema.internalSchema))
+ .withProps(readerProperties)
+ .withLatestCommitTime(split.changes.last.getInstant)
+ .build()
+
CloseableIteratorListener.addListener(fileGroupReader.getClosableIterator).asScala
+ }
+
+ private def loadLogFile(logFile: HoodieLogFile, instant: String):
Iterator[BufferedRecord[InternalRow]] = {
+ val partitionPath =
FSUtils.getRelativePartitionPath(metaClient.getBasePath,
logFile.getPath.getParent)
+ val readerContext = new
SparkFileFormatInternalRowReaderContext(parquetReader, Seq.empty, Seq.empty,
+ conf, metaClient.getTableConfig)
+ readerContext.setLatestCommitTime(instant)
+ readerContext.setHasBootstrapBaseFile(false)
+ readerContext.setHasLogFiles(true)
+ readerContext.initRecordMerger(readerProperties)
+ readerContext.setSchemaHandler(
+ new FileGroupReaderSchemaHandler[InternalRow](readerContext, avroSchema,
avroSchema,
+ Option.empty(), metaClient.getTableConfig, readerProperties))
+ val recordBuffer = new
KeyBasedFileGroupRecordBuffer[InternalRow](readerContext, metaClient,
+ readerContext.getMergeMode, readerProperties, new HoodieReadStats,
+ Option.ofNullable(metaClient.getTableConfig.getPreCombineField), true,
Option.empty())
+
+ HoodieMergedLogRecordReader.newBuilder[InternalRow]
+ .withStorage(metaClient.getStorage)
+ .withHoodieReaderContext(readerContext)
+ .withLogFiles(Collections.singletonList(logFile))
+ .withReverseReader(false)
+
.withBufferSize(HoodieMetadataConfig.MAX_READER_BUFFER_SIZE_PROP.defaultValue)
+ .withPartition(partitionPath)
+ .withAllowInflightInstants(true)
+ .withMetaClient(metaClient)
+ .withAllowInflightInstants(true)
Review Comment:
`withAllowInflightInstants` is expected to be used only for MDT record
generation for writes
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -565,27 +570,62 @@ protected boolean hasNextBaseRecord(T baseRecord,
BufferedRecord<T> logRecordInf
Pair<Boolean, T> isDeleteAndRecord = merge(baseRecordInfo,
logRecordInfo);
if (!isDeleteAndRecord.getLeft()) {
// Updates
- nextRecord = readerContext.seal(isDeleteAndRecord.getRight());
+ nextRecord =
readerContext.seal(applyOutputSchemaConversion(isDeleteAndRecord.getRight()));
+ // If the record is not the same as the base record, we can emit an
update
+ if (isDeleteAndRecord.getRight() != baseRecord) {
+ callbackOption.ifPresent(callback -> {
+ BufferedRecord<T> mergeResult =
BufferedRecord.forRecordWithContext(nextRecord,
readerContext.getSchemaHandler().getRequestedSchema(), readerContext,
orderingFieldName, false);
+
callback.onUpdate(readerContext.constructHoodieRecord(applyOutputSchemaConversion(baseRecordInfo)),
readerContext.constructHoodieRecord(applyOutputSchemaConversion(logRecordInfo)),
+
readerContext.constructHoodieRecord(applyOutputSchemaConversion(mergeResult)));
+ });
+ }
readStats.incrementNumUpdates();
return true;
- } else if (emitDelete) {
- // emit Deletes
- nextRecord = readerContext.getDeleteRow(isDeleteAndRecord.getRight(),
baseRecordInfo.getRecordKey());
- readStats.incrementNumDeletes();
- return nextRecord != null;
} else {
- // not emit Deletes
+ // emit Deletes
+ callbackOption.ifPresent(callback ->
callback.onDelete(readerContext.constructHoodieRecord(applyOutputSchemaConversion(baseRecordInfo)),
+
readerContext.constructHoodieRecord(applyOutputSchemaConversion(logRecordInfo))));
Review Comment:
won't we get just HoodieKeys from delete log block for deletes. Trying to
understand how does applying output schema conversion works here? we just
inject defaults values for entire data cols ?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -565,27 +570,62 @@ protected boolean hasNextBaseRecord(T baseRecord,
BufferedRecord<T> logRecordInf
Pair<Boolean, T> isDeleteAndRecord = merge(baseRecordInfo,
logRecordInfo);
if (!isDeleteAndRecord.getLeft()) {
// Updates
- nextRecord = readerContext.seal(isDeleteAndRecord.getRight());
+ nextRecord =
readerContext.seal(applyOutputSchemaConversion(isDeleteAndRecord.getRight()));
+ // If the record is not the same as the base record, we can emit an
update
+ if (isDeleteAndRecord.getRight() != baseRecord) {
+ callbackOption.ifPresent(callback -> {
+ BufferedRecord<T> mergeResult =
BufferedRecord.forRecordWithContext(nextRecord,
readerContext.getSchemaHandler().getRequestedSchema(), readerContext,
orderingFieldName, false);
+
callback.onUpdate(readerContext.constructHoodieRecord(applyOutputSchemaConversion(baseRecordInfo)),
readerContext.constructHoodieRecord(applyOutputSchemaConversion(logRecordInfo)),
+
readerContext.constructHoodieRecord(applyOutputSchemaConversion(mergeResult)));
+ });
+ }
readStats.incrementNumUpdates();
return true;
- } else if (emitDelete) {
- // emit Deletes
- nextRecord = readerContext.getDeleteRow(isDeleteAndRecord.getRight(),
baseRecordInfo.getRecordKey());
- readStats.incrementNumDeletes();
- return nextRecord != null;
} else {
- // not emit Deletes
+ // emit Deletes
+ callbackOption.ifPresent(callback ->
callback.onDelete(readerContext.constructHoodieRecord(applyOutputSchemaConversion(baseRecordInfo)),
+
readerContext.constructHoodieRecord(applyOutputSchemaConversion(logRecordInfo))));
readStats.incrementNumDeletes();
- return false;
+ if (emitDelete) {
+ nextRecord =
applyOutputSchemaConversion(readerContext.getDeleteRow(isDeleteAndRecord.getRight(),
baseRecordInfo.getRecordKey()));
+ return nextRecord != null;
+ } else {
+ return false;
+ }
}
}
// Inserts
- nextRecord = readerContext.seal(baseRecord);
+ nextRecord = readerContext.seal(applyOutputSchemaConversion(baseRecord));
readStats.incrementNumInserts();
return true;
}
+ /**
+ * Applies the final output schema conversion to the buffered record if
required. This ensures the records match the requested schema.
+ * @param bufferedRecord the buffered record to convert
+ * @return a new buffered record with the converted record and the proper
schema ID set
+ */
+ protected BufferedRecord<T> applyOutputSchemaConversion(BufferedRecord<T>
bufferedRecord) {
+ if (bufferedRecord.getRecord() != null && outputConverter.isPresent()) {
+ return new BufferedRecord<>(bufferedRecord.getRecordKey(),
bufferedRecord.getOrderingValue(),
+ outputConverter.get().apply(bufferedRecord.getRecord()),
readerContext.encodeAvroSchema(readerContext.getSchemaHandler().getRequestedSchema()),
bufferedRecord.isDelete());
Review Comment:
minor.
`readerContext.encodeAvroSchema(readerContext.getSchemaHandler().getRequestedSchema())`
will be same for entire file group. we could initialize once in the
constructor and use it here rater than calling `encodeAvroSchema` for every
record
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -565,27 +570,62 @@ protected boolean hasNextBaseRecord(T baseRecord,
BufferedRecord<T> logRecordInf
Pair<Boolean, T> isDeleteAndRecord = merge(baseRecordInfo,
logRecordInfo);
if (!isDeleteAndRecord.getLeft()) {
// Updates
- nextRecord = readerContext.seal(isDeleteAndRecord.getRight());
+ nextRecord =
readerContext.seal(applyOutputSchemaConversion(isDeleteAndRecord.getRight()));
+ // If the record is not the same as the base record, we can emit an
update
+ if (isDeleteAndRecord.getRight() != baseRecord) {
+ callbackOption.ifPresent(callback -> {
+ BufferedRecord<T> mergeResult =
BufferedRecord.forRecordWithContext(nextRecord,
readerContext.getSchemaHandler().getRequestedSchema(), readerContext,
orderingFieldName, false);
+
callback.onUpdate(readerContext.constructHoodieRecord(applyOutputSchemaConversion(baseRecordInfo)),
readerContext.constructHoodieRecord(applyOutputSchemaConversion(logRecordInfo)),
+
readerContext.constructHoodieRecord(applyOutputSchemaConversion(mergeResult)));
+ });
+ }
readStats.incrementNumUpdates();
return true;
- } else if (emitDelete) {
- // emit Deletes
- nextRecord = readerContext.getDeleteRow(isDeleteAndRecord.getRight(),
baseRecordInfo.getRecordKey());
- readStats.incrementNumDeletes();
- return nextRecord != null;
} else {
- // not emit Deletes
+ // emit Deletes
+ callbackOption.ifPresent(callback ->
callback.onDelete(readerContext.constructHoodieRecord(applyOutputSchemaConversion(baseRecordInfo)),
+
readerContext.constructHoodieRecord(applyOutputSchemaConversion(logRecordInfo))));
readStats.incrementNumDeletes();
- return false;
+ if (emitDelete) {
+ nextRecord =
applyOutputSchemaConversion(readerContext.getDeleteRow(isDeleteAndRecord.getRight(),
baseRecordInfo.getRecordKey()));
Review Comment:
oh, its happening at L 573 and 599 as well
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala:
##########
@@ -482,47 +449,74 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
if (!same) {
// clear up the beforeImageRecords
beforeImageRecords.clear()
- val iter = loadFileSlice(fileSlice)
- iter.foreach { row =>
- val key = getRecordKey(row)
- // Due to the reuse buffer mechanism of Spark serialization,
- // we have to copy the serialized result if we need to retain its
reference
- beforeImageRecords.put(key, serialize(row, copy = true))
+ val iter = loadFileSliceWithKeys(fileSlice)
+ iter.foreach { tuple =>
+ beforeImageRecords.put(tuple._1, tuple._2)
}
// reset beforeImageFiles
beforeImageFiles.clear()
beforeImageFiles.append(files: _*)
}
}
+ private def loadFileSliceWithKeys(fileSlice: FileSlice): Iterator[(String,
InternalRow)] = {
+ val readerContext = new
SparkFileFormatInternalRowReaderContext(parquetReader, Seq.empty, Seq.empty,
+ conf, metaClient.getTableConfig)
+ loadFileSlice(fileSlice, readerContext).map(internalRow => {
+ val recordKey = readerContext.getRecordKey(internalRow, avroSchema)
+ (recordKey, internalRow)
+ })
+ }
+
private def loadFileSlice(fileSlice: FileSlice): Iterator[InternalRow] = {
- val baseFileInfo =
storage.getPathInfo(fileSlice.getBaseFile.get().getStoragePath)
- val basePartitionedFile = sparkPartitionedFileUtils.createPartitionedFile(
- InternalRow.empty,
- baseFileInfo.getPath,
- 0,
- baseFileInfo.getLength
- )
- val logFiles = fileSlice.getLogFiles
- .sorted(HoodieLogFile.getLogFileComparator)
- .collect(Collectors.toList[HoodieLogFile])
- .asScala.toList
- .filterNot(_.getFileName.endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX))
-
- if (logFiles.isEmpty) {
- // no log files, just load the base parquet file
- parquetReader(basePartitionedFile)
- } else {
- // use [[RecordMergingFileIterator]] to load both the base file and log
files
- val morSplit = HoodieMergeOnReadFileSplit(Some(basePartitionedFile),
logFiles)
- new RecordMergingFileIterator(
- morSplit,
- BaseFileReader(parquetReader, originTableSchema.structTypeSchema),
- originTableSchema,
- originTableSchema,
- tableState,
- conf.unwrapAs(classOf[Configuration]))
- }
+ val readerContext = new
SparkFileFormatInternalRowReaderContext(parquetReader, Seq.empty, Seq.empty,
+ conf, metaClient.getTableConfig)
+ loadFileSlice(fileSlice, readerContext)
+ }
+
+ private def loadFileSlice(fileSlice: FileSlice, readerContext:
SparkFileFormatInternalRowReaderContext): Iterator[InternalRow] = {
+ val fileGroupReader = HoodieFileGroupReader.newBuilder()
+ .withReaderContext(readerContext)
+ .withHoodieTableMetaClient(metaClient)
+ .withFileSlice(fileSlice)
+ .withDataSchema(avroSchema)
+ .withRequestedSchema(avroSchema)
+ .withInternalSchema(toJavaOption(originTableSchema.internalSchema))
+ .withProps(readerProperties)
+ .withLatestCommitTime(split.changes.last.getInstant)
+ .build()
+
CloseableIteratorListener.addListener(fileGroupReader.getClosableIterator).asScala
+ }
+
+ private def loadLogFile(logFile: HoodieLogFile, instant: String):
Iterator[BufferedRecord[InternalRow]] = {
+ val partitionPath =
FSUtils.getRelativePartitionPath(metaClient.getBasePath,
logFile.getPath.getParent)
+ val readerContext = new
SparkFileFormatInternalRowReaderContext(parquetReader, Seq.empty, Seq.empty,
+ conf, metaClient.getTableConfig)
+ readerContext.setLatestCommitTime(instant)
+ readerContext.setHasBootstrapBaseFile(false)
+ readerContext.setHasLogFiles(true)
+ readerContext.initRecordMerger(readerProperties)
+ readerContext.setSchemaHandler(
+ new FileGroupReaderSchemaHandler[InternalRow](readerContext, avroSchema,
avroSchema,
+ Option.empty(), metaClient.getTableConfig, readerProperties))
+ val recordBuffer = new
KeyBasedFileGroupRecordBuffer[InternalRow](readerContext, metaClient,
+ readerContext.getMergeMode, readerProperties, new HoodieReadStats,
+ Option.ofNullable(metaClient.getTableConfig.getPreCombineField), true,
Option.empty())
+
+ HoodieMergedLogRecordReader.newBuilder[InternalRow]
+ .withStorage(metaClient.getStorage)
+ .withHoodieReaderContext(readerContext)
+ .withLogFiles(Collections.singletonList(logFile))
+ .withReverseReader(false)
+
.withBufferSize(HoodieMetadataConfig.MAX_READER_BUFFER_SIZE_PROP.defaultValue)
+ .withPartition(partitionPath)
+ .withAllowInflightInstants(true)
+ .withMetaClient(metaClient)
+ .withAllowInflightInstants(true)
Review Comment:
repeated calls to `withAllowInflightInstants`
also, why do we need to set this to true?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -598,11 +638,13 @@ protected boolean hasNextLogRecord() {
while (logRecordIterator.hasNext()) {
BufferedRecord<T> nextRecordInfo = logRecordIterator.next();
if (!nextRecordInfo.isDelete()) {
- nextRecord = nextRecordInfo.getRecord();
+ BufferedRecord<T> convertedBufferedRecord =
applyOutputSchemaConversion(nextRecordInfo);
+ nextRecord = convertedBufferedRecord.getRecord();
+ callbackOption.ifPresent(callback ->
callback.onInsert(readerContext.constructHoodieRecord(convertedBufferedRecord)));
readStats.incrementNumInserts();
return true;
} else if (emitDelete) {
- nextRecord = readerContext.getDeleteRow(nextRecordInfo.getRecord(),
nextRecordInfo.getRecordKey());
+ nextRecord =
applyOutputSchemaConversion(readerContext.getDeleteRow(nextRecordInfo.getRecord(),
nextRecordInfo.getRecordKey()));
Review Comment:
bcoz, in L 567 to 602, we would have invoked callbacks for updates and
deletes. and only pending items are inserts from logs and so just 643 alone
would suffice.
is my understanding right?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupUpdateCallback.java:
##########
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.model.HoodieRecord;
+
+public interface FileGroupUpdateCallback<T> {
Review Comment:
java docs please
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]