yihua commented on code in PR #9899:
URL: https://github.com/apache/hudi/pull/9899#discussion_r1369141606
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java:
##########
@@ -215,6 +219,7 @@ protected List<Pair<String, BloomIndexFileInfo>>
loadColumnRangesFromMetaIndex(
String keyField =
hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
List<Pair<String, HoodieBaseFile>> baseFilesForAllPartitions =
HoodieIndexUtils.getLatestBaseFilesForAllPartitions(partitions, context,
hoodieTable);
+ context.clearJobStatus();
Review Comment:
This shouldn't be added. Key range loading has not finished here.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java:
##########
@@ -758,7 +762,6 @@ protected void reconcileAgainstMarkers(HoodieEngineContext
context,
}
// Now delete partially written files
- context.setJobStatus(this.getClass().getSimpleName(), "Delete all
partially written files: " + config.getTableName());
Review Comment:
Why deleting this?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java:
##########
@@ -61,6 +61,7 @@ public HoodieWriteMetadata<O> write(String instantTime,
// perform index loop up to get existing location of records
context.setJobStatus(this.getClass().getSimpleName(), "Tagging: " +
table.getConfig().getTableName());
taggedRecords = tag(dedupedRecords, context, table);
+ context.clearJobStatus();
Review Comment:
If lazy execution happens afterwards, the job status may not be properly
populated. Have you verified all places that this won't happen?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java:
##########
@@ -111,44 +111,48 @@ public BaseSparkCommitActionExecutor(HoodieEngineContext
context,
private HoodieData<HoodieRecord<T>>
clusteringHandleUpdate(HoodieData<HoodieRecord<T>> inputRecords) {
context.setJobStatus(this.getClass().getSimpleName(), "Handling updates
which are under clustering: " + config.getTableName());
- Set<HoodieFileGroupId> fileGroupsInPendingClustering =
-
table.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getKey).collect(Collectors.toSet());
- // Skip processing if there is no inflight clustering
- if (fileGroupsInPendingClustering.isEmpty()) {
- return inputRecords;
- }
+ try {
+ Set<HoodieFileGroupId> fileGroupsInPendingClustering =
+
table.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getKey).collect(Collectors.toSet());
+ // Skip processing if there is no inflight clustering
+ if (fileGroupsInPendingClustering.isEmpty()) {
+ return inputRecords;
+ }
- UpdateStrategy<T, HoodieData<HoodieRecord<T>>> updateStrategy =
(UpdateStrategy<T, HoodieData<HoodieRecord<T>>>) ReflectionUtils
- .loadClass(config.getClusteringUpdatesStrategyClass(), new Class<?>[]
{HoodieEngineContext.class, HoodieTable.class, Set.class},
- this.context, table, fileGroupsInPendingClustering);
- // For SparkAllowUpdateStrategy with rollback pending clustering as false,
need not handle
- // the file group intersection between current ingestion and pending
clustering file groups.
- // This will be handled at the conflict resolution strategy.
- if (updateStrategy instanceof SparkAllowUpdateStrategy &&
!config.isRollbackPendingClustering()) {
- return inputRecords;
- }
- Pair<HoodieData<HoodieRecord<T>>, Set<HoodieFileGroupId>>
recordsAndPendingClusteringFileGroups =
- updateStrategy.handleUpdate(inputRecords);
+ UpdateStrategy<T, HoodieData<HoodieRecord<T>>> updateStrategy =
(UpdateStrategy<T, HoodieData<HoodieRecord<T>>>) ReflectionUtils
+ .loadClass(config.getClusteringUpdatesStrategyClass(), new
Class<?>[] {HoodieEngineContext.class, HoodieTable.class, Set.class},
+ this.context, table, fileGroupsInPendingClustering);
+ // For SparkAllowUpdateStrategy with rollback pending clustering as
false, need not handle
+ // the file group intersection between current ingestion and pending
clustering file groups.
+ // This will be handled at the conflict resolution strategy.
+ if (updateStrategy instanceof SparkAllowUpdateStrategy &&
!config.isRollbackPendingClustering()) {
+ return inputRecords;
+ }
+ Pair<HoodieData<HoodieRecord<T>>, Set<HoodieFileGroupId>>
recordsAndPendingClusteringFileGroups =
+ updateStrategy.handleUpdate(inputRecords);
- Set<HoodieFileGroupId> fileGroupsWithUpdatesAndPendingClustering =
recordsAndPendingClusteringFileGroups.getRight();
- if (fileGroupsWithUpdatesAndPendingClustering.isEmpty()) {
+ Set<HoodieFileGroupId> fileGroupsWithUpdatesAndPendingClustering =
recordsAndPendingClusteringFileGroups.getRight();
+ if (fileGroupsWithUpdatesAndPendingClustering.isEmpty()) {
+ return recordsAndPendingClusteringFileGroups.getLeft();
+ }
+ // there are file groups pending clustering and receiving updates, so
rollback the pending clustering instants
+ // there could be race condition, for example, if the clustering
completes after instants are fetched but before rollback completed
+ if (config.isRollbackPendingClustering()) {
+ Set<HoodieInstant> pendingClusteringInstantsToRollback =
getAllFileGroupsInPendingClusteringPlans(table.getMetaClient()).entrySet().stream()
+ .filter(e ->
fileGroupsWithUpdatesAndPendingClustering.contains(e.getKey()))
+ .map(Map.Entry::getValue)
+ .collect(Collectors.toSet());
+ pendingClusteringInstantsToRollback.forEach(instant -> {
+ String commitTime = table.getMetaClient().createNewInstantTime();
+ table.scheduleRollback(context, commitTime, instant, false,
config.shouldRollbackUsingMarkers(), false);
+ table.rollback(context, commitTime, instant, true, true);
+ });
+ table.getMetaClient().reloadActiveTimeline();
+ }
return recordsAndPendingClusteringFileGroups.getLeft();
+ } finally {
+ context.clearJobStatus();
Review Comment:
Could you check here for lazy execution too?
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java:
##########
@@ -175,32 +175,36 @@ protected JavaRDD<HoodieRecord<HoodieRecordPayload>>
buildHoodieRecordsForImport
HoodieEngineContext context = new HoodieSparkEngineContext(jsc);
context.setJobStatus(this.getClass().getSimpleName(), "Build records for
import: " + cfg.tableName);
- return jsc.newAPIHadoopFile(cfg.srcPath, ParquetInputFormat.class,
Void.class, GenericRecord.class,
- job.getConfiguration())
- // To reduce large number of tasks.
- .coalesce(16 * cfg.parallelism).map(entry -> {
- GenericRecord genericRecord = ((Tuple2<Void, GenericRecord>)
entry)._2();
- Object partitionField = genericRecord.get(cfg.partitionKey);
- if (partitionField == null) {
- throw new HoodieIOException("partition key is missing. :" +
cfg.partitionKey);
- }
- Object rowField = genericRecord.get(cfg.rowKey);
- if (rowField == null) {
- throw new HoodieIOException("row field is missing. :" +
cfg.rowKey);
- }
- String partitionPath = partitionField.toString();
- LOG.debug("Row Key : " + rowField + ", Partition Path is (" +
partitionPath + ")");
- if (partitionField instanceof Number) {
- try {
- long ts = (long) (Double.parseDouble(partitionField.toString())
* 1000L);
- partitionPath =
PARTITION_FORMATTER.format(Instant.ofEpochMilli(ts));
- } catch (NumberFormatException nfe) {
- LOG.warn("Unable to parse date from partition field. Assuming
partition as (" + partitionField + ")");
+ try {
+ return jsc.newAPIHadoopFile(cfg.srcPath, ParquetInputFormat.class,
Void.class, GenericRecord.class,
+ job.getConfiguration())
+ // To reduce large number of tasks.
+ .coalesce(16 * cfg.parallelism).map(entry -> {
+ GenericRecord genericRecord = ((Tuple2<Void, GenericRecord>)
entry)._2();
+ Object partitionField = genericRecord.get(cfg.partitionKey);
+ if (partitionField == null) {
+ throw new HoodieIOException("partition key is missing. :" +
cfg.partitionKey);
+ }
+ Object rowField = genericRecord.get(cfg.rowKey);
+ if (rowField == null) {
+ throw new HoodieIOException("row field is missing. :" +
cfg.rowKey);
+ }
+ String partitionPath = partitionField.toString();
+ LOG.debug("Row Key : " + rowField + ", Partition Path is (" +
partitionPath + ")");
+ if (partitionField instanceof Number) {
+ try {
+ long ts = (long)
(Double.parseDouble(partitionField.toString()) * 1000L);
+ partitionPath =
PARTITION_FORMATTER.format(Instant.ofEpochMilli(ts));
+ } catch (NumberFormatException nfe) {
+ LOG.warn("Unable to parse date from partition field. Assuming
partition as (" + partitionField + ")");
+ }
}
- }
- return new HoodieAvroRecord<>(new HoodieKey(rowField.toString(),
partitionPath),
- new HoodieJsonPayload(genericRecord.toString()));
- });
+ return new HoodieAvroRecord<>(new HoodieKey(rowField.toString(),
partitionPath),
+ new HoodieJsonPayload(genericRecord.toString()));
+ });
+ } finally {
+ context.clearJobStatus();
Review Comment:
This one too.
##########
hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/HDFSParquetImporterUtils.java:
##########
@@ -189,32 +189,36 @@ public JavaRDD<HoodieRecord<HoodieRecordPayload>>
buildHoodieRecordsForImport(Ja
HoodieEngineContext context = new HoodieSparkEngineContext(jsc);
context.setJobStatus(this.getClass().getSimpleName(), "Build records for
import: " + this.tableName);
- return jsc.newAPIHadoopFile(this.srcPath, ParquetInputFormat.class,
Void.class, GenericRecord.class,
- job.getConfiguration())
- // To reduce large number of tasks.
- .coalesce(16 * this.parallelism).map(entry -> {
- GenericRecord genericRecord = ((Tuple2<Void, GenericRecord>)
entry)._2();
- Object partitionField = genericRecord.get(this.partitionKey);
- if (partitionField == null) {
- throw new HoodieIOException("partition key is missing. :" +
this.partitionKey);
- }
- Object rowField = genericRecord.get(this.rowKey);
- if (rowField == null) {
- throw new HoodieIOException("row field is missing. :" +
this.rowKey);
- }
- String partitionPath = partitionField.toString();
- LOG.debug("Row Key : " + rowField + ", Partition Path is (" +
partitionPath + ")");
- if (partitionField instanceof Number) {
- try {
- long ts = (long) (Double.parseDouble(partitionField.toString())
* 1000L);
- partitionPath =
PARTITION_FORMATTER.format(Instant.ofEpochMilli(ts));
- } catch (NumberFormatException nfe) {
- LOG.warn("Unable to parse date from partition field. Assuming
partition as (" + partitionField + ")");
+ try {
+ return jsc.newAPIHadoopFile(this.srcPath, ParquetInputFormat.class,
Void.class, GenericRecord.class,
+ job.getConfiguration())
+ // To reduce large number of tasks.
+ .coalesce(16 * this.parallelism).map(entry -> {
+ GenericRecord genericRecord = ((Tuple2<Void, GenericRecord>)
entry)._2();
+ Object partitionField = genericRecord.get(this.partitionKey);
+ if (partitionField == null) {
+ throw new HoodieIOException("partition key is missing. :" +
this.partitionKey);
+ }
+ Object rowField = genericRecord.get(this.rowKey);
+ if (rowField == null) {
+ throw new HoodieIOException("row field is missing. :" +
this.rowKey);
}
- }
- return new HoodieAvroRecord<>(new HoodieKey(rowField.toString(),
partitionPath),
- new HoodieJsonPayload(genericRecord.toString()));
- });
+ String partitionPath = partitionField.toString();
+ LOG.debug("Row Key : " + rowField + ", Partition Path is (" +
partitionPath + ")");
+ if (partitionField instanceof Number) {
+ try {
+ long ts = (long)
(Double.parseDouble(partitionField.toString()) * 1000L);
+ partitionPath =
PARTITION_FORMATTER.format(Instant.ofEpochMilli(ts));
+ } catch (NumberFormatException nfe) {
+ LOG.warn("Unable to parse date from partition field. Assuming
partition as (" + partitionField + ")");
+ }
+ }
+ return new HoodieAvroRecord<>(new HoodieKey(rowField.toString(),
partitionPath),
+ new HoodieJsonPayload(genericRecord.toString()));
+ });
+ } finally {
+ context.clearJobStatus();
Review Comment:
This one too.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1823,37 +1856,12 @@ public boolean hasNext() {
public HoodieRecord next() {
return forDelete
?
HoodieMetadataPayload.createRecordIndexDelete(recordKeyIterator.next())
- :
HoodieMetadataPayload.createRecordIndexUpdate(recordKeyIterator.next(),
partition, fileSlice.getFileId(), fileSlice.getBaseInstantTime(), 0);
+ :
HoodieMetadataPayload.createRecordIndexUpdate(recordKeyIterator.next(),
partition, fileId, instantTime, 0);
}
};
- }
- final HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
- final String filename = baseFile.getFileName();
- Path dataFilePath = new Path(basePath, partition + Path.SEPARATOR +
filename);
-
- final String fileId = baseFile.getFileId();
- final String instantTime = baseFile.getCommitTime();
- HoodieFileReader reader =
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(configuration.get(),
dataFilePath);
- ClosableIterator<String> recordKeyIterator =
reader.getRecordKeyIterator();
-
- return new ClosableIterator<HoodieRecord>() {
- @Override
- public void close() {
- recordKeyIterator.close();
- }
-
- @Override
- public boolean hasNext() {
- return recordKeyIterator.hasNext();
- }
-
- @Override
- public HoodieRecord next() {
- return forDelete
- ?
HoodieMetadataPayload.createRecordIndexDelete(recordKeyIterator.next())
- :
HoodieMetadataPayload.createRecordIndexUpdate(recordKeyIterator.next(),
partition, fileId, instantTime, 0);
- }
- };
- });
+ });
+ } finally {
+ engineContext.clearJobStatus();
Review Comment:
Check this one too for lazy execution.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java:
##########
@@ -315,10 +315,14 @@ private HoodieData<BootstrapWriteStatus>
runMetadataBootstrap(List<Pair<String,
.collect(Collectors.toList());
context.setJobStatus(this.getClass().getSimpleName(), "Run metadata-only
bootstrap operation: " + config.getTableName());
- return context.parallelize(
- bootstrapPaths, Math.min(bootstrapPaths.size(),
config.getBootstrapParallelism()))
+ try {
+ return context.parallelize(
+ bootstrapPaths, Math.min(bootstrapPaths.size(),
config.getBootstrapParallelism()))
.map(partitionFsPair -> getMetadataHandler(config, table,
partitionFsPair.getRight().getRight()).runMetadataBootstrap(partitionFsPair.getLeft(),
- partitionFsPair.getRight().getLeft(), keyGenerator));
+ partitionFsPair.getRight().getLeft(), keyGenerator));
+ } finally {
+ context.clearJobStatus();
Review Comment:
This method composes a DAG and is triggered by lazy execution.
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java:
##########
@@ -203,41 +207,45 @@ private void exportAsHudi(JavaSparkContext jsc,
FileSystem sourceFs,
final HoodieEngineContext context = new HoodieSparkEngineContext(jsc);
final SerializableConfiguration serConf = context.getHadoopConf();
context.setJobStatus(this.getClass().getSimpleName(), "Exporting as HUDI
dataset");
- List<Pair<String, String>> partitionAndFileList =
context.flatMap(partitions, partition -> {
- // Only take latest version files <= latestCommit.
- List<Pair<String, String>> filePaths = fsView
- .getLatestBaseFilesBeforeOrOn(partition, latestCommitTimestamp)
- .map(f -> Pair.of(partition, f.getPath()))
- .collect(Collectors.toList());
- // also need to copy over partition metadata
- FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, serConf.newCopy());
- Path partitionMetaFile =
HoodiePartitionMetadata.getPartitionMetafilePath(fs,
- FSUtils.getPartitionPath(cfg.sourceBasePath, partition)).get();
- if (fs.exists(partitionMetaFile)) {
- filePaths.add(Pair.of(partition, partitionMetaFile.toString()));
- }
- return filePaths.stream();
- }, parallelism);
-
- context.foreach(partitionAndFileList, partitionAndFile -> {
- String partition = partitionAndFile.getLeft();
- Path sourceFilePath = new Path(partitionAndFile.getRight());
- Path toPartitionPath = FSUtils.getPartitionPath(cfg.targetOutputPath,
partition);
- FileSystem executorSourceFs = FSUtils.getFs(cfg.sourceBasePath,
serConf.newCopy());
- FileSystem executorOutputFs = FSUtils.getFs(cfg.targetOutputPath,
serConf.newCopy());
-
- if (!executorOutputFs.exists(toPartitionPath)) {
- executorOutputFs.mkdirs(toPartitionPath);
- }
- FileUtil.copy(
- executorSourceFs,
- sourceFilePath,
- executorOutputFs,
- new Path(toPartitionPath, sourceFilePath.getName()),
- false,
- false,
- executorOutputFs.getConf());
- }, parallelism);
+ try {
+ List<Pair<String, String>> partitionAndFileList =
context.flatMap(partitions, partition -> {
+ // Only take latest version files <= latestCommit.
+ List<Pair<String, String>> filePaths = fsView
+ .getLatestBaseFilesBeforeOrOn(partition, latestCommitTimestamp)
+ .map(f -> Pair.of(partition, f.getPath()))
+ .collect(Collectors.toList());
+ // also need to copy over partition metadata
+ FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, serConf.newCopy());
+ Path partitionMetaFile =
HoodiePartitionMetadata.getPartitionMetafilePath(fs,
+ FSUtils.getPartitionPath(cfg.sourceBasePath, partition)).get();
+ if (fs.exists(partitionMetaFile)) {
+ filePaths.add(Pair.of(partition, partitionMetaFile.toString()));
+ }
+ return filePaths.stream();
+ }, parallelism);
+
+ context.foreach(partitionAndFileList, partitionAndFile -> {
+ String partition = partitionAndFile.getLeft();
+ Path sourceFilePath = new Path(partitionAndFile.getRight());
+ Path toPartitionPath = FSUtils.getPartitionPath(cfg.targetOutputPath,
partition);
+ FileSystem executorSourceFs = FSUtils.getFs(cfg.sourceBasePath,
serConf.newCopy());
+ FileSystem executorOutputFs = FSUtils.getFs(cfg.targetOutputPath,
serConf.newCopy());
+
+ if (!executorOutputFs.exists(toPartitionPath)) {
+ executorOutputFs.mkdirs(toPartitionPath);
+ }
+ FileUtil.copy(
+ executorSourceFs,
+ sourceFilePath,
+ executorOutputFs,
+ new Path(toPartitionPath, sourceFilePath.getName()),
+ false,
+ false,
+ executorOutputFs.getConf());
+ }, parallelism);
+ } finally {
+ context.clearJobStatus();
Review Comment:
This should be at the end of the method, correct? since `context.foreach`
also triggers Spark stages?
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1731,37 +1731,41 @@ public static HoodieData<HoodieRecord>
readRecordKeysFromBaseFiles(HoodieEngineC
}
engineContext.setJobStatus(activeModule, "Record Index: reading record
keys from " + partitionBaseFilePairs.size() + " base files");
- final int parallelism = Math.min(partitionBaseFilePairs.size(),
recordIndexMaxParallelism);
- return engineContext.parallelize(partitionBaseFilePairs,
parallelism).flatMap(partitionAndBaseFile -> {
- final String partition = partitionAndBaseFile.getKey();
- final HoodieBaseFile baseFile = partitionAndBaseFile.getValue();
- final String filename = baseFile.getFileName();
- Path dataFilePath = new Path(basePath, partition + Path.SEPARATOR +
filename);
-
- final String fileId = baseFile.getFileId();
- final String instantTime = baseFile.getCommitTime();
- HoodieFileReader reader =
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(configuration.get(),
dataFilePath);
- ClosableIterator<String> recordKeyIterator =
reader.getRecordKeyIterator();
-
- return new ClosableIterator<HoodieRecord>() {
- @Override
- public void close() {
- recordKeyIterator.close();
- }
+ try {
+ final int parallelism = Math.min(partitionBaseFilePairs.size(),
recordIndexMaxParallelism);
+ return engineContext.parallelize(partitionBaseFilePairs,
parallelism).flatMap(partitionAndBaseFile -> {
+ final String partition = partitionAndBaseFile.getKey();
+ final HoodieBaseFile baseFile = partitionAndBaseFile.getValue();
+ final String filename = baseFile.getFileName();
+ Path dataFilePath = new Path(basePath, partition + Path.SEPARATOR +
filename);
+
+ final String fileId = baseFile.getFileId();
+ final String instantTime = baseFile.getCommitTime();
+ HoodieFileReader reader =
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(configuration.get(),
dataFilePath);
+ ClosableIterator<String> recordKeyIterator =
reader.getRecordKeyIterator();
- @Override
- public boolean hasNext() {
- return recordKeyIterator.hasNext();
- }
+ return new ClosableIterator<HoodieRecord>() {
+ @Override
+ public void close() {
+ recordKeyIterator.close();
+ }
- @Override
- public HoodieRecord next() {
- return forDelete
- ?
HoodieMetadataPayload.createRecordIndexDelete(recordKeyIterator.next())
- :
HoodieMetadataPayload.createRecordIndexUpdate(recordKeyIterator.next(),
partition, fileId, instantTime, 0);
- }
- };
- });
+ @Override
+ public boolean hasNext() {
+ return recordKeyIterator.hasNext();
+ }
+
+ @Override
+ public HoodieRecord next() {
+ return forDelete
+ ?
HoodieMetadataPayload.createRecordIndexDelete(recordKeyIterator.next())
+ :
HoodieMetadataPayload.createRecordIndexUpdate(recordKeyIterator.next(),
partition, fileId, instantTime, 0);
+ }
+ };
+ });
+ } finally {
+ engineContext.clearJobStatus();
Review Comment:
This may be subject to lazy execution.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java:
##########
@@ -165,20 +169,24 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>>
execute(HoodieData<HoodieRec
// Handle records update with clustering
HoodieData<HoodieRecord<T>> inputRecordsWithClusteringUpdate =
clusteringHandleUpdate(inputRecords);
- context.setJobStatus(this.getClass().getSimpleName(), "Building workload
profile:" + config.getTableName());
- WorkloadProfile workloadProfile =
- new WorkloadProfile(buildProfile(inputRecordsWithClusteringUpdate),
operationType, table.getIndex().canIndexLogFiles());
- LOG.debug("Input workload profile :" + workloadProfile);
-
- // partition using the insert partitioner
- final Partitioner partitioner = getPartitioner(workloadProfile);
- saveWorkloadProfileMetadataToInflight(workloadProfile, instantTime);
-
- context.setJobStatus(this.getClass().getSimpleName(), "Doing partition and
writing data: " + config.getTableName());
- HoodieData<WriteStatus> writeStatuses =
mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner);
- HoodieWriteMetadata<HoodieData<WriteStatus>> result = new
HoodieWriteMetadata<>();
- updateIndexAndCommitIfNeeded(writeStatuses, result);
- return result;
+ try {
+ context.setJobStatus(this.getClass().getSimpleName(), "Building workload
profile:" + config.getTableName());
+ WorkloadProfile workloadProfile =
+ new WorkloadProfile(buildProfile(inputRecordsWithClusteringUpdate),
operationType, table.getIndex().canIndexLogFiles());
+ LOG.debug("Input workload profile :" + workloadProfile);
+
+ // partition using the insert partitioner
+ final Partitioner partitioner = getPartitioner(workloadProfile);
+ saveWorkloadProfileMetadataToInflight(workloadProfile, instantTime);
+
+ context.setJobStatus(this.getClass().getSimpleName(), "Doing partition
and writing data: " + config.getTableName());
+ HoodieData<WriteStatus> writeStatuses =
mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner);
+ HoodieWriteMetadata<HoodieData<WriteStatus>> result = new
HoodieWriteMetadata<>();
+ updateIndexAndCommitIfNeeded(writeStatuses, result);
+ return result;
+ } finally {
+ context.clearJobStatus();
Review Comment:
Check here too.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]