yihua commented on code in PR #9899:
URL: https://github.com/apache/hudi/pull/9899#discussion_r1369141606


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java:
##########
@@ -215,6 +219,7 @@ protected List<Pair<String, BloomIndexFileInfo>> 
loadColumnRangesFromMetaIndex(
     String keyField = 
hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
 
     List<Pair<String, HoodieBaseFile>> baseFilesForAllPartitions = 
HoodieIndexUtils.getLatestBaseFilesForAllPartitions(partitions, context, 
hoodieTable);
+    context.clearJobStatus();

Review Comment:
   This shouldn't be added. Key range loading has not finished here.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java:
##########
@@ -758,7 +762,6 @@ protected void reconcileAgainstMarkers(HoodieEngineContext 
context,
         }
 
         // Now delete partially written files
-        context.setJobStatus(this.getClass().getSimpleName(), "Delete all 
partially written files: " + config.getTableName());

Review Comment:
   Why deleting this?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java:
##########
@@ -61,6 +61,7 @@ public HoodieWriteMetadata<O> write(String instantTime,
         // perform index loop up to get existing location of records
         context.setJobStatus(this.getClass().getSimpleName(), "Tagging: " + 
table.getConfig().getTableName());
         taggedRecords = tag(dedupedRecords, context, table);
+        context.clearJobStatus();

Review Comment:
   If lazy execution happens afterwards, the job status may not be properly 
populated.  Have you verified all places that this won't happen?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java:
##########
@@ -111,44 +111,48 @@ public BaseSparkCommitActionExecutor(HoodieEngineContext 
context,
 
   private HoodieData<HoodieRecord<T>> 
clusteringHandleUpdate(HoodieData<HoodieRecord<T>> inputRecords) {
     context.setJobStatus(this.getClass().getSimpleName(), "Handling updates 
which are under clustering: " + config.getTableName());
-    Set<HoodieFileGroupId> fileGroupsInPendingClustering =
-        
table.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getKey).collect(Collectors.toSet());
-    // Skip processing if there is no inflight clustering
-    if (fileGroupsInPendingClustering.isEmpty()) {
-      return inputRecords;
-    }
+    try {
+      Set<HoodieFileGroupId> fileGroupsInPendingClustering =
+          
table.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getKey).collect(Collectors.toSet());
+      // Skip processing if there is no inflight clustering
+      if (fileGroupsInPendingClustering.isEmpty()) {
+        return inputRecords;
+      }
 
-    UpdateStrategy<T, HoodieData<HoodieRecord<T>>> updateStrategy = 
(UpdateStrategy<T, HoodieData<HoodieRecord<T>>>) ReflectionUtils
-        .loadClass(config.getClusteringUpdatesStrategyClass(), new Class<?>[] 
{HoodieEngineContext.class, HoodieTable.class, Set.class},
-            this.context, table, fileGroupsInPendingClustering);
-    // For SparkAllowUpdateStrategy with rollback pending clustering as false, 
need not handle
-    // the file group intersection between current ingestion and pending 
clustering file groups.
-    // This will be handled at the conflict resolution strategy.
-    if (updateStrategy instanceof SparkAllowUpdateStrategy && 
!config.isRollbackPendingClustering()) {
-      return inputRecords;
-    }
-    Pair<HoodieData<HoodieRecord<T>>, Set<HoodieFileGroupId>> 
recordsAndPendingClusteringFileGroups =
-        updateStrategy.handleUpdate(inputRecords);
+      UpdateStrategy<T, HoodieData<HoodieRecord<T>>> updateStrategy = 
(UpdateStrategy<T, HoodieData<HoodieRecord<T>>>) ReflectionUtils
+          .loadClass(config.getClusteringUpdatesStrategyClass(), new 
Class<?>[] {HoodieEngineContext.class, HoodieTable.class, Set.class},
+              this.context, table, fileGroupsInPendingClustering);
+      // For SparkAllowUpdateStrategy with rollback pending clustering as 
false, need not handle
+      // the file group intersection between current ingestion and pending 
clustering file groups.
+      // This will be handled at the conflict resolution strategy.
+      if (updateStrategy instanceof SparkAllowUpdateStrategy && 
!config.isRollbackPendingClustering()) {
+        return inputRecords;
+      }
+      Pair<HoodieData<HoodieRecord<T>>, Set<HoodieFileGroupId>> 
recordsAndPendingClusteringFileGroups =
+          updateStrategy.handleUpdate(inputRecords);
 
-    Set<HoodieFileGroupId> fileGroupsWithUpdatesAndPendingClustering = 
recordsAndPendingClusteringFileGroups.getRight();
-    if (fileGroupsWithUpdatesAndPendingClustering.isEmpty()) {
+      Set<HoodieFileGroupId> fileGroupsWithUpdatesAndPendingClustering = 
recordsAndPendingClusteringFileGroups.getRight();
+      if (fileGroupsWithUpdatesAndPendingClustering.isEmpty()) {
+        return recordsAndPendingClusteringFileGroups.getLeft();
+      }
+      // there are file groups pending clustering and receiving updates, so 
rollback the pending clustering instants
+      // there could be race condition, for example, if the clustering 
completes after instants are fetched but before rollback completed
+      if (config.isRollbackPendingClustering()) {
+        Set<HoodieInstant> pendingClusteringInstantsToRollback = 
getAllFileGroupsInPendingClusteringPlans(table.getMetaClient()).entrySet().stream()
+            .filter(e -> 
fileGroupsWithUpdatesAndPendingClustering.contains(e.getKey()))
+            .map(Map.Entry::getValue)
+            .collect(Collectors.toSet());
+        pendingClusteringInstantsToRollback.forEach(instant -> {
+          String commitTime = table.getMetaClient().createNewInstantTime();
+          table.scheduleRollback(context, commitTime, instant, false, 
config.shouldRollbackUsingMarkers(), false);
+          table.rollback(context, commitTime, instant, true, true);
+        });
+        table.getMetaClient().reloadActiveTimeline();
+      }
       return recordsAndPendingClusteringFileGroups.getLeft();
+    } finally {
+      context.clearJobStatus();

Review Comment:
   Could you check here for lazy execution too?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java:
##########
@@ -175,32 +175,36 @@ protected JavaRDD<HoodieRecord<HoodieRecordPayload>> 
buildHoodieRecordsForImport
 
     HoodieEngineContext context = new HoodieSparkEngineContext(jsc);
     context.setJobStatus(this.getClass().getSimpleName(), "Build records for 
import: " + cfg.tableName);
-    return jsc.newAPIHadoopFile(cfg.srcPath, ParquetInputFormat.class, 
Void.class, GenericRecord.class,
-            job.getConfiguration())
-        // To reduce large number of tasks.
-        .coalesce(16 * cfg.parallelism).map(entry -> {
-          GenericRecord genericRecord = ((Tuple2<Void, GenericRecord>) 
entry)._2();
-          Object partitionField = genericRecord.get(cfg.partitionKey);
-          if (partitionField == null) {
-            throw new HoodieIOException("partition key is missing. :" + 
cfg.partitionKey);
-          }
-          Object rowField = genericRecord.get(cfg.rowKey);
-          if (rowField == null) {
-            throw new HoodieIOException("row field is missing. :" + 
cfg.rowKey);
-          }
-          String partitionPath = partitionField.toString();
-          LOG.debug("Row Key : " + rowField + ", Partition Path is (" + 
partitionPath + ")");
-          if (partitionField instanceof Number) {
-            try {
-              long ts = (long) (Double.parseDouble(partitionField.toString()) 
* 1000L);
-              partitionPath = 
PARTITION_FORMATTER.format(Instant.ofEpochMilli(ts));
-            } catch (NumberFormatException nfe) {
-              LOG.warn("Unable to parse date from partition field. Assuming 
partition as (" + partitionField + ")");
+    try {
+      return jsc.newAPIHadoopFile(cfg.srcPath, ParquetInputFormat.class, 
Void.class, GenericRecord.class,
+          job.getConfiguration())
+          // To reduce large number of tasks.
+          .coalesce(16 * cfg.parallelism).map(entry -> {
+            GenericRecord genericRecord = ((Tuple2<Void, GenericRecord>) 
entry)._2();
+            Object partitionField = genericRecord.get(cfg.partitionKey);
+            if (partitionField == null) {
+              throw new HoodieIOException("partition key is missing. :" + 
cfg.partitionKey);
+            }
+            Object rowField = genericRecord.get(cfg.rowKey);
+            if (rowField == null) {
+              throw new HoodieIOException("row field is missing. :" + 
cfg.rowKey);
+            }
+            String partitionPath = partitionField.toString();
+            LOG.debug("Row Key : " + rowField + ", Partition Path is (" + 
partitionPath + ")");
+            if (partitionField instanceof Number) {
+              try {
+                long ts = (long) 
(Double.parseDouble(partitionField.toString()) * 1000L);
+                partitionPath = 
PARTITION_FORMATTER.format(Instant.ofEpochMilli(ts));
+              } catch (NumberFormatException nfe) {
+                LOG.warn("Unable to parse date from partition field. Assuming 
partition as (" + partitionField + ")");
+              }
             }
-          }
-          return new HoodieAvroRecord<>(new HoodieKey(rowField.toString(), 
partitionPath),
-              new HoodieJsonPayload(genericRecord.toString()));
-        });
+            return new HoodieAvroRecord<>(new HoodieKey(rowField.toString(), 
partitionPath),
+                new HoodieJsonPayload(genericRecord.toString()));
+          });
+    } finally {
+      context.clearJobStatus();

Review Comment:
   This one too.



##########
hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/HDFSParquetImporterUtils.java:
##########
@@ -189,32 +189,36 @@ public JavaRDD<HoodieRecord<HoodieRecordPayload>> 
buildHoodieRecordsForImport(Ja
 
     HoodieEngineContext context = new HoodieSparkEngineContext(jsc);
     context.setJobStatus(this.getClass().getSimpleName(), "Build records for 
import: " + this.tableName);
-    return jsc.newAPIHadoopFile(this.srcPath, ParquetInputFormat.class, 
Void.class, GenericRecord.class,
-            job.getConfiguration())
-        // To reduce large number of tasks.
-        .coalesce(16 * this.parallelism).map(entry -> {
-          GenericRecord genericRecord = ((Tuple2<Void, GenericRecord>) 
entry)._2();
-          Object partitionField = genericRecord.get(this.partitionKey);
-          if (partitionField == null) {
-            throw new HoodieIOException("partition key is missing. :" + 
this.partitionKey);
-          }
-          Object rowField = genericRecord.get(this.rowKey);
-          if (rowField == null) {
-            throw new HoodieIOException("row field is missing. :" + 
this.rowKey);
-          }
-          String partitionPath = partitionField.toString();
-          LOG.debug("Row Key : " + rowField + ", Partition Path is (" + 
partitionPath + ")");
-          if (partitionField instanceof Number) {
-            try {
-              long ts = (long) (Double.parseDouble(partitionField.toString()) 
* 1000L);
-              partitionPath = 
PARTITION_FORMATTER.format(Instant.ofEpochMilli(ts));
-            } catch (NumberFormatException nfe) {
-              LOG.warn("Unable to parse date from partition field. Assuming 
partition as (" + partitionField + ")");
+    try {
+      return jsc.newAPIHadoopFile(this.srcPath, ParquetInputFormat.class, 
Void.class, GenericRecord.class,
+          job.getConfiguration())
+          // To reduce large number of tasks.
+          .coalesce(16 * this.parallelism).map(entry -> {
+            GenericRecord genericRecord = ((Tuple2<Void, GenericRecord>) 
entry)._2();
+            Object partitionField = genericRecord.get(this.partitionKey);
+            if (partitionField == null) {
+              throw new HoodieIOException("partition key is missing. :" + 
this.partitionKey);
+            }
+            Object rowField = genericRecord.get(this.rowKey);
+            if (rowField == null) {
+              throw new HoodieIOException("row field is missing. :" + 
this.rowKey);
             }
-          }
-          return new HoodieAvroRecord<>(new HoodieKey(rowField.toString(), 
partitionPath),
-              new HoodieJsonPayload(genericRecord.toString()));
-        });
+            String partitionPath = partitionField.toString();
+            LOG.debug("Row Key : " + rowField + ", Partition Path is (" + 
partitionPath + ")");
+            if (partitionField instanceof Number) {
+              try {
+                long ts = (long) 
(Double.parseDouble(partitionField.toString()) * 1000L);
+                partitionPath = 
PARTITION_FORMATTER.format(Instant.ofEpochMilli(ts));
+              } catch (NumberFormatException nfe) {
+                LOG.warn("Unable to parse date from partition field. Assuming 
partition as (" + partitionField + ")");
+              }
+            }
+            return new HoodieAvroRecord<>(new HoodieKey(rowField.toString(), 
partitionPath),
+                new HoodieJsonPayload(genericRecord.toString()));
+          });
+    } finally {
+      context.clearJobStatus();

Review Comment:
   This one too.



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1823,37 +1856,12 @@ public boolean hasNext() {
           public HoodieRecord next() {
             return forDelete
                 ? 
HoodieMetadataPayload.createRecordIndexDelete(recordKeyIterator.next())
-                : 
HoodieMetadataPayload.createRecordIndexUpdate(recordKeyIterator.next(), 
partition, fileSlice.getFileId(), fileSlice.getBaseInstantTime(), 0);
+                : 
HoodieMetadataPayload.createRecordIndexUpdate(recordKeyIterator.next(), 
partition, fileId, instantTime, 0);
           }
         };
-      }
-      final HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
-      final String filename = baseFile.getFileName();
-      Path dataFilePath = new Path(basePath, partition + Path.SEPARATOR + 
filename);
-
-      final String fileId = baseFile.getFileId();
-      final String instantTime = baseFile.getCommitTime();
-      HoodieFileReader reader = 
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(configuration.get(),
 dataFilePath);
-      ClosableIterator<String> recordKeyIterator = 
reader.getRecordKeyIterator();
-
-      return new ClosableIterator<HoodieRecord>() {
-        @Override
-        public void close() {
-          recordKeyIterator.close();
-        }
-
-        @Override
-        public boolean hasNext() {
-          return recordKeyIterator.hasNext();
-        }
-
-        @Override
-        public HoodieRecord next() {
-          return forDelete
-              ? 
HoodieMetadataPayload.createRecordIndexDelete(recordKeyIterator.next())
-              : 
HoodieMetadataPayload.createRecordIndexUpdate(recordKeyIterator.next(), 
partition, fileId, instantTime, 0);
-        }
-      };
-    });
+      });
+    } finally {
+      engineContext.clearJobStatus();

Review Comment:
   Check this one too for lazy execution.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java:
##########
@@ -315,10 +315,14 @@ private HoodieData<BootstrapWriteStatus> 
runMetadataBootstrap(List<Pair<String,
         .collect(Collectors.toList());
 
     context.setJobStatus(this.getClass().getSimpleName(), "Run metadata-only 
bootstrap operation: " + config.getTableName());
-    return context.parallelize(
-            bootstrapPaths, Math.min(bootstrapPaths.size(), 
config.getBootstrapParallelism()))
+    try {
+      return context.parallelize(
+          bootstrapPaths, Math.min(bootstrapPaths.size(), 
config.getBootstrapParallelism()))
         .map(partitionFsPair -> getMetadataHandler(config, table, 
partitionFsPair.getRight().getRight()).runMetadataBootstrap(partitionFsPair.getLeft(),
-            partitionFsPair.getRight().getLeft(), keyGenerator));
+              partitionFsPair.getRight().getLeft(), keyGenerator));
+    } finally {
+      context.clearJobStatus();

Review Comment:
   This method composes a DAG and is triggered by lazy execution.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotExporter.java:
##########
@@ -203,41 +207,45 @@ private void exportAsHudi(JavaSparkContext jsc, 
FileSystem sourceFs,
     final HoodieEngineContext context = new HoodieSparkEngineContext(jsc);
     final SerializableConfiguration serConf = context.getHadoopConf();
     context.setJobStatus(this.getClass().getSimpleName(), "Exporting as HUDI 
dataset");
-    List<Pair<String, String>> partitionAndFileList = 
context.flatMap(partitions, partition -> {
-      // Only take latest version files <= latestCommit.
-      List<Pair<String, String>> filePaths = fsView
-          .getLatestBaseFilesBeforeOrOn(partition, latestCommitTimestamp)
-          .map(f -> Pair.of(partition, f.getPath()))
-          .collect(Collectors.toList());
-      // also need to copy over partition metadata
-      FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, serConf.newCopy());
-      Path partitionMetaFile = 
HoodiePartitionMetadata.getPartitionMetafilePath(fs,
-          FSUtils.getPartitionPath(cfg.sourceBasePath, partition)).get();
-      if (fs.exists(partitionMetaFile)) {
-        filePaths.add(Pair.of(partition, partitionMetaFile.toString()));
-      }
-      return filePaths.stream();
-    }, parallelism);
-
-    context.foreach(partitionAndFileList, partitionAndFile -> {
-      String partition = partitionAndFile.getLeft();
-      Path sourceFilePath = new Path(partitionAndFile.getRight());
-      Path toPartitionPath = FSUtils.getPartitionPath(cfg.targetOutputPath, 
partition);
-      FileSystem executorSourceFs = FSUtils.getFs(cfg.sourceBasePath, 
serConf.newCopy());
-      FileSystem executorOutputFs = FSUtils.getFs(cfg.targetOutputPath, 
serConf.newCopy());
-
-      if (!executorOutputFs.exists(toPartitionPath)) {
-        executorOutputFs.mkdirs(toPartitionPath);
-      }
-      FileUtil.copy(
-          executorSourceFs,
-          sourceFilePath,
-          executorOutputFs,
-          new Path(toPartitionPath, sourceFilePath.getName()),
-          false,
-          false,
-          executorOutputFs.getConf());
-    }, parallelism);
+    try {
+      List<Pair<String, String>> partitionAndFileList = 
context.flatMap(partitions, partition -> {
+        // Only take latest version files <= latestCommit.
+        List<Pair<String, String>> filePaths = fsView
+            .getLatestBaseFilesBeforeOrOn(partition, latestCommitTimestamp)
+            .map(f -> Pair.of(partition, f.getPath()))
+            .collect(Collectors.toList());
+        // also need to copy over partition metadata
+        FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, serConf.newCopy());
+        Path partitionMetaFile = 
HoodiePartitionMetadata.getPartitionMetafilePath(fs,
+            FSUtils.getPartitionPath(cfg.sourceBasePath, partition)).get();
+        if (fs.exists(partitionMetaFile)) {
+          filePaths.add(Pair.of(partition, partitionMetaFile.toString()));
+        }
+        return filePaths.stream();
+      }, parallelism);
+
+      context.foreach(partitionAndFileList, partitionAndFile -> {
+        String partition = partitionAndFile.getLeft();
+        Path sourceFilePath = new Path(partitionAndFile.getRight());
+        Path toPartitionPath = FSUtils.getPartitionPath(cfg.targetOutputPath, 
partition);
+        FileSystem executorSourceFs = FSUtils.getFs(cfg.sourceBasePath, 
serConf.newCopy());
+        FileSystem executorOutputFs = FSUtils.getFs(cfg.targetOutputPath, 
serConf.newCopy());
+
+        if (!executorOutputFs.exists(toPartitionPath)) {
+          executorOutputFs.mkdirs(toPartitionPath);
+        }
+        FileUtil.copy(
+            executorSourceFs,
+            sourceFilePath,
+            executorOutputFs,
+            new Path(toPartitionPath, sourceFilePath.getName()),
+            false,
+            false,
+            executorOutputFs.getConf());
+      }, parallelism);
+    } finally {
+      context.clearJobStatus();

Review Comment:
   This should be at the end of the method, correct? since `context.foreach` 
also triggers Spark stages?



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1731,37 +1731,41 @@ public static HoodieData<HoodieRecord> 
readRecordKeysFromBaseFiles(HoodieEngineC
     }
 
     engineContext.setJobStatus(activeModule, "Record Index: reading record 
keys from " + partitionBaseFilePairs.size() + " base files");
-    final int parallelism = Math.min(partitionBaseFilePairs.size(), 
recordIndexMaxParallelism);
-    return engineContext.parallelize(partitionBaseFilePairs, 
parallelism).flatMap(partitionAndBaseFile -> {
-      final String partition = partitionAndBaseFile.getKey();
-      final HoodieBaseFile baseFile = partitionAndBaseFile.getValue();
-      final String filename = baseFile.getFileName();
-      Path dataFilePath = new Path(basePath, partition + Path.SEPARATOR + 
filename);
-
-      final String fileId = baseFile.getFileId();
-      final String instantTime = baseFile.getCommitTime();
-      HoodieFileReader reader = 
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(configuration.get(),
 dataFilePath);
-      ClosableIterator<String> recordKeyIterator = 
reader.getRecordKeyIterator();
-
-      return new ClosableIterator<HoodieRecord>() {
-        @Override
-        public void close() {
-          recordKeyIterator.close();
-        }
+    try {
+      final int parallelism = Math.min(partitionBaseFilePairs.size(), 
recordIndexMaxParallelism);
+      return engineContext.parallelize(partitionBaseFilePairs, 
parallelism).flatMap(partitionAndBaseFile -> {
+        final String partition = partitionAndBaseFile.getKey();
+        final HoodieBaseFile baseFile = partitionAndBaseFile.getValue();
+        final String filename = baseFile.getFileName();
+        Path dataFilePath = new Path(basePath, partition + Path.SEPARATOR + 
filename);
+
+        final String fileId = baseFile.getFileId();
+        final String instantTime = baseFile.getCommitTime();
+        HoodieFileReader reader = 
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(configuration.get(),
 dataFilePath);
+        ClosableIterator<String> recordKeyIterator = 
reader.getRecordKeyIterator();
 
-        @Override
-        public boolean hasNext() {
-          return recordKeyIterator.hasNext();
-        }
+        return new ClosableIterator<HoodieRecord>() {
+          @Override
+          public void close() {
+            recordKeyIterator.close();
+          }
 
-        @Override
-        public HoodieRecord next() {
-          return forDelete
-              ? 
HoodieMetadataPayload.createRecordIndexDelete(recordKeyIterator.next())
-              : 
HoodieMetadataPayload.createRecordIndexUpdate(recordKeyIterator.next(), 
partition, fileId, instantTime, 0);
-        }
-      };
-    });
+          @Override
+          public boolean hasNext() {
+            return recordKeyIterator.hasNext();
+          }
+
+          @Override
+          public HoodieRecord next() {
+            return forDelete
+                ? 
HoodieMetadataPayload.createRecordIndexDelete(recordKeyIterator.next())
+                : 
HoodieMetadataPayload.createRecordIndexUpdate(recordKeyIterator.next(), 
partition, fileId, instantTime, 0);
+          }
+        };
+      });
+    } finally {
+      engineContext.clearJobStatus();

Review Comment:
   This may be subject to lazy execution.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java:
##########
@@ -165,20 +169,24 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> 
execute(HoodieData<HoodieRec
     // Handle records update with clustering
     HoodieData<HoodieRecord<T>> inputRecordsWithClusteringUpdate = 
clusteringHandleUpdate(inputRecords);
 
-    context.setJobStatus(this.getClass().getSimpleName(), "Building workload 
profile:" + config.getTableName());
-    WorkloadProfile workloadProfile =
-        new WorkloadProfile(buildProfile(inputRecordsWithClusteringUpdate), 
operationType, table.getIndex().canIndexLogFiles());
-    LOG.debug("Input workload profile :" + workloadProfile);
-
-    // partition using the insert partitioner
-    final Partitioner partitioner = getPartitioner(workloadProfile);
-    saveWorkloadProfileMetadataToInflight(workloadProfile, instantTime);
-
-    context.setJobStatus(this.getClass().getSimpleName(), "Doing partition and 
writing data: " + config.getTableName());
-    HoodieData<WriteStatus> writeStatuses = 
mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner);
-    HoodieWriteMetadata<HoodieData<WriteStatus>> result = new 
HoodieWriteMetadata<>();
-    updateIndexAndCommitIfNeeded(writeStatuses, result);
-    return result;
+    try {
+      context.setJobStatus(this.getClass().getSimpleName(), "Building workload 
profile:" + config.getTableName());
+      WorkloadProfile workloadProfile =
+          new WorkloadProfile(buildProfile(inputRecordsWithClusteringUpdate), 
operationType, table.getIndex().canIndexLogFiles());
+      LOG.debug("Input workload profile :" + workloadProfile);
+
+      // partition using the insert partitioner
+      final Partitioner partitioner = getPartitioner(workloadProfile);
+      saveWorkloadProfileMetadataToInflight(workloadProfile, instantTime);
+
+      context.setJobStatus(this.getClass().getSimpleName(), "Doing partition 
and writing data: " + config.getTableName());
+      HoodieData<WriteStatus> writeStatuses = 
mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner);
+      HoodieWriteMetadata<HoodieData<WriteStatus>> result = new 
HoodieWriteMetadata<>();
+      updateIndexAndCommitIfNeeded(writeStatuses, result);
+      return result;
+    } finally {
+      context.clearJobStatus();

Review Comment:
   Check here too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to