This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.14.2-prep
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/release-0.14.2-prep by this 
push:
     new 6ff2eb94af76 fix: cherry-picking for 0.14.2 release (#17924)
6ff2eb94af76 is described below

commit 6ff2eb94af76ab7c681f6a9209958a8288214289
Author: Lin Liu <[email protected]>
AuthorDate: Thu Mar 5 01:50:38 2026 -0800

    fix: cherry-picking for 0.14.2 release (#17924)
    
    * [HUDI-9088] Fix unnecessary scanning of target table in MERGE INTO on 
Spark (#12934)
    
    * [HUDI-8212] Add extra config of billing project ID for BigQuery sync 
(#11988)
    
    Co-authored-by: Y Ethan Guo <[email protected]>
    
    * [HUDI-9288] Fixing HoodieFileGroup api related to uncommitted slices 
(#13125)
    
    * [HUDI-6868] Support extracting passwords from credential store for Hive 
Sync (#10577)
    
    Co-authored-by: Danny Chan <[email protected]>
    
    * [MINOR] [BRANCH-0.x] Added condition to check default value to fix 
extracting password from credential store (#11247)
    
    * [HUDI-9039] Run do init table transaction only when required (#12847)
    
    Co-authored-by: Leon Lin <[email protected]>
    
    * [HUDI-9681] Remove mkdir in partition listing and add try catch to 
listStatus of partition (#13739)
    
    * [HUDI-9770] During hive/glue sync, ensure drop partition events are 
generated when partition is present in the metastore (#13794)
    
    * [HUDI-7478] Fix max delta commits guard check w/ MDT (#10820)
    
    Co-authored-by: Vova Kolmakov <[email protected]>
    
    * [HUDI-8161] Make spark-sql command 'desc' independent from schema 
evolution config (#11871)
    
    Co-authored-by: Vova Kolmakov <[email protected]>
    
    * Fix CI failures
    
    * Add some preliminary change
    
    * Fix a CI test
    
    * Fix validation bundle failures
    
    * Try update docker_java17_test.sh
    
    ---------
    
    Co-authored-by: Y Ethan Guo <[email protected]>
    Co-authored-by: Aditya Goenka 
<[email protected]>
    Co-authored-by: Tim Brown <[email protected]>
    Co-authored-by: Danny Chan <[email protected]>
    Co-authored-by: Leon Lin <[email protected]>
    Co-authored-by: Leon Lin <[email protected]>
    Co-authored-by: Roushan Kumar <[email protected]>
    Co-authored-by: wombatu-kun <[email protected]>
    Co-authored-by: Vova Kolmakov <[email protected]>
    Co-authored-by: Vova Kolmakov <[email protected]>
---
 .github/workflows/bot.yml                          |  11 ++
 .../apache/hudi/client/BaseHoodieWriteClient.java  |  22 ++-
 .../metadata/HoodieBackedTableMetadataWriter.java  |   4 +-
 .../functional/TestHoodieBackedMetadata.java       |  37 +++++
 .../apache/hudi/common/model/HoodieFileGroup.java  |   2 +-
 .../table/view/AbstractTableFileSystemView.java    |  22 +--
 .../metadata/FileSystemBackedTableMetadata.java    |   9 +-
 .../hudi/common/model/TestHoodieFileGroup.java     |   1 +
 .../hudi/gcp/bigquery/BigQuerySyncConfig.java      |  12 ++
 .../gcp/bigquery/HoodieBigQuerySyncClient.java     |   7 +-
 .../hudi/gcp/bigquery/TestBigQuerySyncConfig.java  |   3 +
 .../gcp/bigquery/TestHoodieBigQuerySyncClient.java |  33 ++++-
 hudi-integ-test/pom.xml                            |  10 +-
 .../scala/org/apache/hudi/DataSourceOptions.scala  |   2 +-
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  16 ++-
 .../hudi/command/MergeIntoHoodieTableCommand.scala |  51 +++----
 .../spark/sql/hudi/HoodieSparkSqlTestBase.scala    |  51 ++++++-
 .../apache/spark/sql/hudi/TestDescribeTable.scala  | 117 ++++++++++++++++
 .../apache/spark/sql/hudi/TestMergeIntoTable.scala | 150 +++++++++++++--------
 .../hudi/analysis/HoodieSpark32PlusAnalysis.scala  |   9 +-
 .../java/org/apache/hudi/hive/HiveSyncTool.java    |   1 +
 .../org/apache/hudi/hive/TestHiveSyncTool.java     | 123 +++++++++++++++++
 .../apache/hudi/sync/common/HoodieSyncClient.java  |  13 +-
 packaging/bundle-validation/ci_run.sh              |   6 +
 packaging/bundle-validation/validate.sh            |  26 ++--
 25 files changed, 619 insertions(+), 119 deletions(-)

diff --git a/.github/workflows/bot.yml b/.github/workflows/bot.yml
index fd3cc67976a1..432d35981728 100644
--- a/.github/workflows/bot.yml
+++ b/.github/workflows/bot.yml
@@ -421,6 +421,17 @@ jobs:
           java-version: '8'
           distribution: 'adopt'
           architecture: x64
+      - name: Check disk space
+        run: df -h
+      - name: 'Free space'
+        run: |
+          sudo rm -rf /usr/share/dotnet
+          sudo rm -rf /usr/local/lib/android
+          sudo rm -rf /opt/ghc
+          sudo rm -rf /usr/local/share/boost
+          docker system prune --all --force --volumes
+      - name: Check disk space after cleanup
+        run: df -h
       - name: Build Project
         env:
           SPARK_PROFILE: ${{ matrix.sparkProfile }}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index d847468e4145..bfb811a15990 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -39,6 +39,7 @@ import org.apache.hudi.common.model.ActionType;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.TableServiceType;
 import org.apache.hudi.common.model.WriteOperationType;
@@ -1258,15 +1259,32 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
     if (instantTime.isPresent()) {
       ownerInstant = Option.of(new HoodieInstant(true, 
CommitUtils.getCommitActionType(operationType, metaClient.getTableType()), 
instantTime.get()));
     }
-    this.txnManager.beginTransaction(ownerInstant, Option.empty());
-    try {
+
+    boolean requiresInitTable = needsUpgradeOrDowngrade(metaClient) || 
config.isMetadataTableEnabled();
+    if (!requiresInitTable) {
+      return;
+    }
+    executeUsingTxnManager(ownerInstant, () -> {
       tryUpgrade(metaClient, instantTime);
+      // TODO: this also does MT table management..
       initMetadataTable(instantTime);
+    });
+  }
+
+  private void executeUsingTxnManager(Option<HoodieInstant> ownerInstant, 
Runnable r) {
+    this.txnManager.beginTransaction(ownerInstant, Option.empty());
+    try {
+      r.run();
     } finally {
       this.txnManager.endTransaction(ownerInstant);
     }
   }
 
+  private boolean needsUpgradeOrDowngrade(HoodieTableMetaClient metaClient) {
+    UpgradeDowngrade upgradeDowngrade = new UpgradeDowngrade(metaClient, 
config, context, upgradeDowngradeHelper);
+    return 
upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current());
+  }
+
   /**
    * Bootstrap the metadata table.
    *
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 4687bf47c2c9..15838520edcc 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -97,7 +97,7 @@ import java.util.stream.IntStream;
 import static 
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_POPULATE_META_FIELDS;
 import static org.apache.hudi.common.table.HoodieTableConfig.ARCHIVELOG_FOLDER;
 import static 
org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
-import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.getIndexInflightInstant;
 import static 
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeIndexPlan;
@@ -762,7 +762,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
   protected static void checkNumDeltaCommits(HoodieTableMetaClient metaClient, 
int maxNumDeltaCommitsWhenPending) {
     final HoodieActiveTimeline activeTimeline = 
metaClient.reloadActiveTimeline();
     Option<HoodieInstant> lastCompaction = 
activeTimeline.filterCompletedInstants()
-        .filter(s -> s.getAction().equals(COMPACTION_ACTION)).lastInstant();
+        .filter(s -> s.getAction().equals(COMMIT_ACTION)).lastInstant();
     int numDeltaCommits = lastCompaction.isPresent()
         ? 
activeTimeline.getDeltaCommitTimeline().findInstantsAfter(lastCompaction.get().getTimestamp()).countInstants()
         : activeTimeline.getDeltaCommitTimeline().countInstants();
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index ffeade2af57c..739bc5df9361 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -165,6 +165,7 @@ import static 
org.apache.hudi.common.model.WriteOperationType.DELETE;
 import static org.apache.hudi.common.model.WriteOperationType.INSERT;
 import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
 import static 
org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_EXTENSION;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_EXTENSION;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.INFLIGHT_EXTENSION;
@@ -2886,6 +2887,42 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     assertTrue(t.getMessage().startsWith(String.format("Metadata table's 
deltacommits exceeded %d: ", maxNumDeltacommits)));
   }
 
+  @Test
+  public void testMORCheckNumDeltaCommits() throws Exception {
+    init(MERGE_ON_READ, true);
+    final int maxNumDeltaCommits = 3;
+    writeConfig = getWriteConfigBuilder(true, true, false)
+            .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+                    .enable(true)
+                    .enableMetrics(false)
+                    .withMaxNumDeltaCommitsBeforeCompaction(maxNumDeltaCommits 
- 1)
+                    .withMaxNumDeltacommitsWhenPending(maxNumDeltaCommits)
+                    .build())
+            .build();
+    initWriteConfigAndMetatableWriter(writeConfig, true);
+    // write deltacommits to data-table and do compaction in metadata-table 
(with commit-instant)
+    doWriteOperation(testTable, HoodieActiveTimeline.createNewInstantTime(1));
+    doWriteOperation(testTable, HoodieActiveTimeline.createNewInstantTime(1));
+    // ensure the compaction is triggered and executed
+    try (HoodieBackedTableMetadata metadata = new 
HoodieBackedTableMetadata(context, writeConfig.getMetadataConfig(), 
writeConfig.getBasePath(), true)) {
+      HoodieTableMetaClient metadataMetaClient = 
metadata.getMetadataMetaClient();
+      final HoodieActiveTimeline activeTimeline = 
metadataMetaClient.reloadActiveTimeline();
+      Option<HoodieInstant> lastCompaction = 
activeTimeline.filterCompletedInstants()
+              .filter(s -> s.getAction().equals(COMMIT_ACTION)).lastInstant();
+      assertTrue(lastCompaction.isPresent());
+      // create pending instant in data table
+      
testTable.addRequestedCommit(HoodieActiveTimeline.createNewInstantTime(1));
+      // continue writing
+      for (int i = 0; i < maxNumDeltaCommits; i++) {
+        doWriteOperation(testTable, 
HoodieActiveTimeline.createNewInstantTime(1));
+      }
+      Throwable t = assertThrows(HoodieMetadataException.class, () -> 
doWriteOperation(testTable, HoodieActiveTimeline.createNewInstantTime(1)));
+      assertTrue(t.getMessage().startsWith(String.format("Metadata table's 
deltacommits exceeded %d: ", maxNumDeltaCommits)));
+      assertEquals(maxNumDeltaCommits + 1,
+              
activeTimeline.reload().getDeltaCommitTimeline().findInstantsAfter(lastCompaction.get().getTimestamp()).countInstants());
+    }
+  }
+
   @Test
   public void testNonPartitioned() throws Exception {
     init(HoodieTableType.COPY_ON_WRITE, false);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java
index 9b5e8c1dd6f0..1c042d051aa9 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java
@@ -154,7 +154,7 @@ public class HoodieFileGroup implements Serializable {
   }
 
   public Stream<FileSlice> getAllFileSlicesBeforeOn(String maxInstantTime) {
-    return fileSlices.values().stream().filter(slice -> 
compareTimestamps(slice.getBaseInstantTime(), LESSER_THAN_OR_EQUALS, 
maxInstantTime));
+    return getAllFileSlices().filter(slice -> 
compareTimestamps(slice.getBaseInstantTime(), LESSER_THAN_OR_EQUALS, 
maxInstantTime));
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
index c6e524e8dd78..d051b33b74b5 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
@@ -72,6 +72,7 @@ import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.compareTimestamps;
 
 /**
  * Common thread-safe implementation for multiple TableFileSystemView 
Implementations.
@@ -254,7 +255,7 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
 
     // Duplicate key error when insert_overwrite same partition in multi 
writer, keep the instant with greater timestamp when the file group id conflicts
     Map<HoodieFileGroupId, HoodieInstant> replacedFileGroups = 
resultStream.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue,
-        (instance1, instance2) -> 
HoodieTimeline.compareTimestamps(instance1.getTimestamp(), 
HoodieTimeline.LESSER_THAN, instance2.getTimestamp()) ? instance2 : instance1));
+        (instance1, instance2) -> compareTimestamps(instance1.getTimestamp(), 
HoodieTimeline.LESSER_THAN, instance2.getTimestamp()) ? instance2 : instance1));
     resetReplacedFileGroups(replacedFileGroups);
     LOG.info("Took " + hoodieTimer.endTimer() + " ms to read  " + 
replacedTimeline.countInstants() + " instants, "
         + replacedFileGroups.size() + " replaced file groups");
@@ -397,9 +398,7 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
       try {
         fileStatusMap.put(partitionPair, 
metaClient.getFs().listStatus(absolutePartitionPath));
       } catch (IOException e) {
-        // Create the path if it does not exist already
         if (!metaClient.getFs().exists(absolutePartitionPath)) {
-          metaClient.getFs().mkdirs(absolutePartitionPath);
           fileStatusMap.put(partitionPair, new FileStatus[0]);
         } else {
           // in case the partition path was created by another caller
@@ -466,9 +465,7 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
     try {
       return metaClient.getFs().listStatus(partitionPath);
     } catch (IOException e) {
-      // Create the path if it does not exist already
       if (!metaClient.getFs().exists(partitionPath)) {
-        metaClient.getFs().mkdirs(partitionPath);
         return new FileStatus[0];
       } else {
         // in case the partition path was created by another caller
@@ -702,7 +699,7 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
     return fetchAllStoredFileGroups(partitionPath)
         .filter(fileGroup -> 
!isFileGroupReplacedBeforeOrOn(fileGroup.getFileGroupId(), maxCommitTime))
         .map(fileGroup -> Option.fromJavaOptional(fileGroup.getAllBaseFiles()
-            .filter(baseFile -> 
HoodieTimeline.compareTimestamps(baseFile.getCommitTime(), 
HoodieTimeline.LESSER_THAN_OR_EQUALS, maxCommitTime
+            .filter(baseFile -> compareTimestamps(baseFile.getCommitTime(), 
HoodieTimeline.LESSER_THAN_OR_EQUALS, maxCommitTime
             ))
             .filter(df -> !isBaseFileDueToPendingCompaction(df) && 
!isBaseFileDueToPendingClustering(df)).findFirst()))
         .filter(Option::isPresent).map(Option::get)
@@ -719,7 +716,7 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
         return Option.empty();
       } else {
         return fetchHoodieFileGroup(partitionPath, fileId).map(fileGroup -> 
fileGroup.getAllBaseFiles()
-                .filter(baseFile -> 
HoodieTimeline.compareTimestamps(baseFile.getCommitTime(), 
HoodieTimeline.EQUALS,
+                .filter(baseFile -> 
compareTimestamps(baseFile.getCommitTime(), HoodieTimeline.EQUALS,
                     instantTime)).filter(df -> 
!isBaseFileDueToPendingCompaction(df) && 
!isBaseFileDueToPendingClustering(df)).findFirst().orElse(null))
             .map(df -> addBootstrapBaseFileIfPresent(new 
HoodieFileGroupId(partitionPath, fileId), df));
       }
@@ -1444,7 +1441,10 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
    * @param maxInstantTime The max instant time
    */
   private Option<FileSlice> fetchAllLogsMergedFileSlice(HoodieFileGroup 
fileGroup, String maxInstantTime) {
-    List<FileSlice> fileSlices = 
fileGroup.getAllFileSlicesBeforeOn(maxInstantTime).collect(Collectors.toList());
+    List<FileSlice> fileSlices = 
fileGroup.getAllRawFileSlices().collect(Collectors.toList());
+    fileSlices = fileSlices.stream()
+        .filter(slice -> compareTimestamps(slice.getBaseInstantTime(), 
LESSER_THAN_OR_EQUALS, maxInstantTime))
+        .collect(Collectors.toList());
     if (fileSlices.size() == 0) {
       return Option.empty();
     }
@@ -1506,7 +1506,7 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
       return false;
     }
 
-    return HoodieTimeline.compareTimestamps(instant, GREATER_THAN, 
hoodieInstantOption.get().getTimestamp());
+    return compareTimestamps(instant, GREATER_THAN, 
hoodieInstantOption.get().getTimestamp());
   }
 
   private boolean isFileGroupReplacedBeforeOrOn(HoodieFileGroupId fileGroupId, 
String instant) {
@@ -1515,7 +1515,7 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
       return false;
     }
 
-    return HoodieTimeline.compareTimestamps(instant, GREATER_THAN_OR_EQUALS, 
hoodieInstantOption.get().getTimestamp());
+    return compareTimestamps(instant, GREATER_THAN_OR_EQUALS, 
hoodieInstantOption.get().getTimestamp());
   }
 
   private boolean isFileGroupReplacedAfterOrOn(HoodieFileGroupId fileGroupId, 
String instant) {
@@ -1524,7 +1524,7 @@ public abstract class AbstractTableFileSystemView 
implements SyncableFileSystemV
       return false;
     }
 
-    return HoodieTimeline.compareTimestamps(instant, LESSER_THAN_OR_EQUALS, 
hoodieInstantOption.get().getTimestamp());
+    return compareTimestamps(instant, LESSER_THAN_OR_EQUALS, 
hoodieInstantOption.get().getTimestamp());
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
index 51797677016c..164b4c8ae3f5 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -55,6 +56,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * Implementation of {@link HoodieTableMetadata} based file-system-backed 
table metadata.
@@ -173,7 +175,12 @@ public class FileSystemBackedTableMetadata extends 
AbstractHoodieTableMetadata {
       // Need to use serializable file status here, see HUDI-5936
       List<HoodieSerializableFileStatus> dirToFileListing = 
engineContext.flatMap(pathsToList, path -> {
         FileSystem fileSystem = path.getFileSystem(hadoopConf.get());
-        return 
Arrays.stream(HoodieSerializableFileStatus.fromFileStatuses(fileSystem.listStatus(path)));
+        try {
+          return 
Arrays.stream(HoodieSerializableFileStatus.fromFileStatuses(fileSystem.listStatus(path)));
+        } catch (FileNotFoundException e) {
+          // The partition may have been cleaned.
+          return Stream.empty();
+        }
       }, listingParallelism);
       pathsToList.clear();
 
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieFileGroup.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieFileGroup.java
index a7cdf22f8020..9956c8850061 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieFileGroup.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieFileGroup.java
@@ -50,6 +50,7 @@ public class TestHoodieFileGroup {
       fileGroup.addBaseFile(baseFile);
     }
     assertEquals(2, fileGroup.getAllFileSlices().count());
+    assertEquals(2, fileGroup.getAllFileSlicesBeforeOn("002").count());
     assertTrue(!fileGroup.getAllFileSlices().anyMatch(s -> 
s.getBaseInstantTime().equals("002")));
     assertEquals(3, fileGroup.getAllFileSlicesIncludingInflight().count());
     
assertTrue(fileGroup.getLatestFileSlice().get().getBaseInstantTime().equals("001"));
diff --git 
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java 
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java
index ec0354355795..b15b4f2842cd 100644
--- 
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java
+++ 
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/BigQuerySyncConfig.java
@@ -58,6 +58,15 @@ public class BigQuerySyncConfig extends HoodieSyncConfig 
implements Serializable
       .markAdvanced()
       .withDocumentation("Name of the target project in BigQuery");
 
+  public static final ConfigProperty<String> BIGQUERY_SYNC_BILLING_PROJECT_ID 
= ConfigProperty
+      .key("hoodie.gcp.bigquery.sync.billing.project.id")
+      .noDefaultValue()
+      .sinceVersion("0.15.1")
+      .markAdvanced()
+      .withDocumentation("Name of the billing project id in BigQuery. By 
default it uses the "
+          + "configuration from `hoodie.gcp.bigquery.sync.project_id` if this 
configuration is "
+          + "not set. This can only be used with manifest file based 
approach");
+  
   public static final ConfigProperty<String> BIGQUERY_SYNC_DATASET_NAME = 
ConfigProperty
       .key("hoodie.gcp.bigquery.sync.dataset_name")
       .noDefaultValue()
@@ -156,6 +165,8 @@ public class BigQuerySyncConfig extends HoodieSyncConfig 
implements Serializable
 
     @Parameter(names = {"--project-id"}, description = "Name of the target 
project in BigQuery", required = true)
     public String projectId;
+    @Parameter(names = {"--billing-project-id"}, description = "Name of the 
billing project in BigQuery. This can only be used with 
--use-bq-manifest-file", required = false)
+    public String billingProjectId;
     @Parameter(names = {"--dataset-name"}, description = "Name of the target 
dataset in BigQuery", required = true)
     public String datasetName;
     @Parameter(names = {"--dataset-location"}, description = "Location of the 
target dataset in BigQuery", required = true)
@@ -181,6 +192,7 @@ public class BigQuerySyncConfig extends HoodieSyncConfig 
implements Serializable
     public TypedProperties toProps() {
       final TypedProperties props = hoodieSyncConfigParams.toProps();
       props.setPropertyIfNonNull(BIGQUERY_SYNC_PROJECT_ID.key(), projectId);
+      props.setPropertyIfNonNull(BIGQUERY_SYNC_BILLING_PROJECT_ID.key(), 
billingProjectId);
       props.setPropertyIfNonNull(BIGQUERY_SYNC_DATASET_NAME.key(), 
datasetName);
       props.setPropertyIfNonNull(BIGQUERY_SYNC_DATASET_LOCATION.key(), 
datasetLocation);
       props.setPropertyIfNonNull(BIGQUERY_SYNC_TABLE_NAME.key(), 
hoodieSyncConfigParams.tableName);
diff --git 
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java
 
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java
index af56194214df..11733352670f 100644
--- 
a/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java
+++ 
b/hudi-gcp/src/main/java/org/apache/hudi/gcp/bigquery/HoodieBigQuerySyncClient.java
@@ -56,6 +56,8 @@ import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_BIG_
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_LOCATION;
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME;
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PROJECT_ID;
+import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_BILLING_PROJECT_ID;
+
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER;
 
 public class HoodieBigQuerySyncClient extends HoodieSyncClient {
@@ -64,6 +66,7 @@ public class HoodieBigQuerySyncClient extends 
HoodieSyncClient {
 
   protected final BigQuerySyncConfig config;
   private final String projectId;
+  private final String billingProjectId;
   private final String bigLakeConnectionId;
   private final String datasetName;
   private final boolean requirePartitionFilter;
@@ -73,6 +76,7 @@ public class HoodieBigQuerySyncClient extends 
HoodieSyncClient {
     super(config);
     this.config = config;
     this.projectId = config.getString(BIGQUERY_SYNC_PROJECT_ID);
+    this.billingProjectId = config.getString(BIGQUERY_SYNC_BILLING_PROJECT_ID);
     this.bigLakeConnectionId = 
config.getString(BIGQUERY_SYNC_BIG_LAKE_CONNECTION_ID);
     this.datasetName = config.getString(BIGQUERY_SYNC_DATASET_NAME);
     this.requirePartitionFilter = 
config.getBoolean(BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER);
@@ -84,6 +88,7 @@ public class HoodieBigQuerySyncClient extends 
HoodieSyncClient {
     super(config);
     this.config = config;
     this.projectId = config.getString(BIGQUERY_SYNC_PROJECT_ID);
+    this.billingProjectId = 
config.getStringOrDefault(BIGQUERY_SYNC_BILLING_PROJECT_ID, this.projectId);
     this.datasetName = config.getString(BIGQUERY_SYNC_DATASET_NAME);
     this.requirePartitionFilter = 
config.getBoolean(BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER);
     this.bigquery = bigquery;
@@ -129,7 +134,7 @@ public class HoodieBigQuerySyncClient extends 
HoodieSyncClient {
       QueryJobConfiguration queryConfig = 
QueryJobConfiguration.newBuilder(query)
           .setUseLegacySql(false)
           .build();
-      JobId jobId = 
JobId.newBuilder().setProject(projectId).setRandomJob().build();
+      JobId jobId = 
JobId.newBuilder().setProject(billingProjectId).setRandomJob().build();
       Job queryJob = 
bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());
 
       queryJob = queryJob.waitFor();
diff --git 
a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncConfig.java
 
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncConfig.java
index d31566df1315..910027b45303 100644
--- 
a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncConfig.java
+++ 
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestBigQuerySyncConfig.java
@@ -34,6 +34,7 @@ import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATA
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME;
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PARTITION_FIELDS;
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PROJECT_ID;
+import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_BILLING_PROJECT_ID;
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI;
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI_PREFIX;
 import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SYNC_BASE_PATH;
@@ -49,6 +50,7 @@ public class TestBigQuerySyncConfig {
   public void testGetConfigs() {
     Properties props = new Properties();
     props.setProperty(BIGQUERY_SYNC_PROJECT_ID.key(), "fooproject");
+    props.setProperty(BIGQUERY_SYNC_BILLING_PROJECT_ID.key(), 
"foobillingproject");
     props.setProperty(BIGQUERY_SYNC_DATASET_NAME.key(), "foodataset");
     props.setProperty(BIGQUERY_SYNC_DATASET_LOCATION.key(), "US");
     props.setProperty(BIGQUERY_SYNC_TABLE_NAME.key(), "footable");
@@ -61,6 +63,7 @@ public class TestBigQuerySyncConfig {
     props.setProperty(BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING.key(), "true");
     BigQuerySyncConfig syncConfig = new BigQuerySyncConfig(props);
     assertEquals("fooproject", syncConfig.getString(BIGQUERY_SYNC_PROJECT_ID));
+    assertEquals("foobillingproject", 
syncConfig.getString(BIGQUERY_SYNC_BILLING_PROJECT_ID));
     assertEquals("foodataset", 
syncConfig.getString(BIGQUERY_SYNC_DATASET_NAME));
     assertEquals("US", syncConfig.getString(BIGQUERY_SYNC_DATASET_LOCATION));
     assertEquals("footable", syncConfig.getString(BIGQUERY_SYNC_TABLE_NAME));
diff --git 
a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestHoodieBigQuerySyncClient.java
 
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestHoodieBigQuerySyncClient.java
index 37b2800b563d..8519c5bdafd0 100644
--- 
a/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestHoodieBigQuerySyncClient.java
+++ 
b/hudi-gcp/src/test/java/org/apache/hudi/gcp/bigquery/TestHoodieBigQuerySyncClient.java
@@ -37,17 +37,22 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.ArgumentCaptor;
 
 import java.nio.file.Path;
 import java.util.Properties;
 
+import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_BILLING_PROJECT_ID;
+import static 
org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PROJECT_ID;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class TestHoodieBigQuerySyncClient {
   private static final String PROJECT_ID = "test_project";
+  private static final String BILLING_PROJECT_ID = "test_billing_project";
   private static final String MANIFEST_FILE_URI = "file:/manifest_file";
   private static final String SOURCE_PREFIX = "file:/manifest_file/date=*";
   private static final String TEST_TABLE = "test_table";
@@ -73,12 +78,38 @@ public class TestHoodieBigQuerySyncClient {
   @BeforeEach
   void setup() {
     properties = new Properties();
-    properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_PROJECT_ID.key(), 
PROJECT_ID);
+    properties.setProperty(BIGQUERY_SYNC_PROJECT_ID.key(), PROJECT_ID);
+    properties.setProperty(BIGQUERY_SYNC_BILLING_PROJECT_ID.key(), 
BILLING_PROJECT_ID);
     
properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME.key(), 
TEST_DATASET);
     properties.setProperty(HoodieSyncConfig.META_SYNC_BASE_PATH.key(), 
tempDir.toString());
     
properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER.key(),
 "true");
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void testCreateOrUpdateTableUsingManifestWithBillingProjectId(boolean 
setBillingProjectId) {
+    Properties props = new Properties();
+    props.setProperty(BIGQUERY_SYNC_PROJECT_ID.key(), PROJECT_ID);
+    if (setBillingProjectId) {
+      props.setProperty(BIGQUERY_SYNC_BILLING_PROJECT_ID.key(), 
BILLING_PROJECT_ID);
+    }
+    props.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME.key(), 
TEST_DATASET);
+    props.setProperty(HoodieSyncConfig.META_SYNC_BASE_PATH.key(), 
tempDir.toString());
+    
props.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_REQUIRE_PARTITION_FILTER.key(),
 "true");
+    BigQuerySyncConfig syncConfig = new BigQuerySyncConfig(props);
+    Job mockJob = mock(Job.class);
+    ArgumentCaptor<JobInfo> jobInfoCaptor = 
ArgumentCaptor.forClass(JobInfo.class);
+    when(mockBigQuery.create(jobInfoCaptor.capture())).thenReturn(mockJob);
+
+    HoodieBigQuerySyncClient syncClient = new 
HoodieBigQuerySyncClient(syncConfig, mockBigQuery);
+    Schema schema = Schema.of(Field.of("field", StandardSQLTypeName.STRING));
+    syncClient.createOrUpdateTableUsingBqManifestFile(TEST_TABLE, 
MANIFEST_FILE_URI, SOURCE_PREFIX, schema);
+
+    assertEquals(
+        setBillingProjectId ? BILLING_PROJECT_ID : PROJECT_ID,
+        jobInfoCaptor.getValue().getJobId().getProject());
+  }
+
   @Test
   void createTableWithManifestFile_partitioned() throws Exception {
     
properties.setProperty(BigQuerySyncConfig.BIGQUERY_SYNC_BIG_LAKE_CONNECTION_ID.key(),
 "my-project.us.bl_connection");
diff --git a/hudi-integ-test/pom.xml b/hudi-integ-test/pom.xml
index 174ba397c780..abf316daf79c 100644
--- a/hudi-integ-test/pom.xml
+++ b/hudi-integ-test/pom.xml
@@ -375,7 +375,9 @@
 
   <properties>
     
<dockerCompose.envFile>${project.basedir}/compose_env</dockerCompose.envFile>
-    
<dockerCompose.file>${project.basedir}/../docker/compose/docker-compose_hadoop284_hive233_spark244.yml</dockerCompose.file>
+    <dockerCompose.file>
+      
${project.basedir}/../docker/compose/docker-compose_hadoop284_hive233_spark244.yml
+    </dockerCompose.file>
     <docker.compose.skip>${skipITs}</docker.compose.skip>
     <main.basedir>${project.parent.basedir}</main.basedir>
   </properties>
@@ -499,7 +501,9 @@
     <profile>
       <id>m1-mac</id>
       <properties>
-        
<dockerCompose.file>${project.basedir}/../docker/compose/docker-compose_hadoop284_hive233_spark244_mac_aarch64.yml</dockerCompose.file>
+        <dockerCompose.file>
+          
${project.basedir}/../docker/compose/docker-compose_hadoop284_hive233_spark244_mac_aarch64.yml
+        </dockerCompose.file>
       </properties>
       <activation>
         <os>
@@ -509,4 +513,4 @@
       </activation>
     </profile>
   </profiles>
-</project>
+</project>
\ No newline at end of file
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index d8110a31f09c..332a1c311435 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -667,7 +667,7 @@ object DataSourceWriteOptions {
     .defaultValue("true")
     .markAdvanced()
     .sinceVersion("0.14.0")
-    .withDocumentation("Controls whether spark sql prepped update, delete, and 
merge are enabled.")
+    .withDocumentation("Controls whether spark sql prepped update and delete 
are enabled.")
 
   val OVERWRITE_MODE: ConfigProperty[String] = ConfigProperty
     .key("hoodie.datasource.overwrite.mode")
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 41e8ba902a7e..a311368fe451 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -21,6 +21,8 @@ import org.apache.avro.Schema
 import org.apache.avro.generic.GenericData
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.shims.ShimLoader
 import 
org.apache.hudi.AutoRecordKeyGenerationUtils.mayBeValidateParamsForAutoGenerationOfRecordKeys
 import org.apache.hudi.AvroConversionUtils.{convertAvroSchemaToStructType, 
convertStructTypeToAvroSchema, getAvroRecordNameAndNamespace}
 import 
org.apache.hudi.DataSourceOptionsHelper.fetchMissingWriteConfigsFromTableConfig
@@ -1006,7 +1008,19 @@ class HoodieSparkSqlWriterInternal {
       
properties.put(HiveSyncConfigHolder.HIVE_SYNC_SCHEMA_STRING_LENGTH_THRESHOLD.key,
 
spark.sessionState.conf.getConf(StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD).toString)
       properties.put(HoodieSyncConfig.META_SYNC_SPARK_VERSION.key, 
SPARK_VERSION)
       
properties.put(HoodieSyncConfig.META_SYNC_USE_FILE_LISTING_FROM_METADATA.key, 
hoodieConfig.getBoolean(HoodieMetadataConfig.ENABLE))
-
+      if ((fs.getConf.get(HiveConf.ConfVars.METASTOREPWD.varname) == null || 
fs.getConf.get(HiveConf.ConfVars.METASTOREPWD.varname).isEmpty) &&
+        (properties.get(HiveSyncConfigHolder.HIVE_PASS.key()) == null || 
properties.get(HiveSyncConfigHolder.HIVE_PASS.key()).toString.isEmpty || 
properties.get(HiveSyncConfigHolder.HIVE_PASS.key()).toString.equalsIgnoreCase(HiveSyncConfigHolder.HIVE_PASS.defaultValue()))){
+        try {
+          val passwd = 
ShimLoader.getHadoopShims.getPassword(spark.sparkContext.hadoopConfiguration, 
HiveConf.ConfVars.METASTOREPWD.varname)
+          if (passwd != null && !passwd.isEmpty) {
+            fs.getConf.set(HiveConf.ConfVars.METASTOREPWD.varname, passwd)
+            properties.put(HiveSyncConfigHolder.HIVE_PASS.key(), passwd)
+          }
+        } catch {
+          case e: Exception =>
+            log.info("Exception while trying to get Meta Sync password from 
hadoop credential store", e)
+        }
+      }
       // Collect exceptions in list because we want all sync to run. Then we 
can throw
       val failedMetaSyncs = new mutable.HashMap[String,HoodieException]()
       syncClientToolClassSet.foreach(impl => {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index dd8e62ab53c9..de7a9baa9054 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -275,11 +275,10 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
       sparkSession.conf.set("spark.sql.crossJoin.enabled","true")
     }
 
-    val projectedJoinedDF: DataFrame = projectedJoinedDataset
-    // Create the write parameters
     val props = buildMergeIntoConfig(hoodieCatalogTable)
+    val processedInputDf: DataFrame = getProcessedInputDf
     // Do the upsert
-    executeUpsert(projectedJoinedDF, props)
+    executeUpsert(processedInputDf, props)
     // Refresh the table in the catalog
     sparkSession.catalog.refreshTable(hoodieCatalogTable.table.qualifiedName)
 
@@ -290,6 +289,10 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
   private val insertingActions: Seq[InsertAction] = 
mergeInto.notMatchedActions.collect { case u: InsertAction => u}
   private val deletingActions: Seq[DeleteAction] = 
mergeInto.matchedActions.collect { case u: DeleteAction => u}
 
+  private def hasPrimaryKey(): Boolean = {
+    hoodieCatalogTable.tableConfig.getRecordKeyFields.isPresent
+  }
+
   /**
    * Here we're adjusting incoming (source) dataset in case its schema is 
divergent from
    * the target table, to make sure it (at a bare minimum)
@@ -328,29 +331,30 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
    * <li>{@code ts = source.sts}</li>
    * </ul>
    */
-  def projectedJoinedDataset: DataFrame = {
+  private def getProcessedInputDf: DataFrame = {
     val resolver = sparkSession.sessionState.analyzer.resolver
 
-    // We want to join the source and target tables.
-    // Then we want to project the output so that we have the meta columns 
from the target table
-    // followed by the data columns of the source table
-    val tableMetaCols = mergeInto.targetTable.output.filter(a => 
isMetaField(a.name))
-    val joinData = 
sparkAdapter.getCatalystPlanUtils.createMITJoin(mergeInto.sourceTable, 
mergeInto.targetTable, LeftOuter, Some(mergeInto.mergeCondition), "NONE")
-    val incomingDataCols = 
joinData.output.filterNot(mergeInto.targetTable.outputSet.contains)
-    // for pkless table, we need to project the meta columns
-    val hasPrimaryKey = 
hoodieCatalogTable.tableConfig.getRecordKeyFields.isPresent
-    val projectedJoinPlan = if (!hasPrimaryKey || 
sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key(), 
"false") == "true") {
+    // For pkless table, we need to project the meta columns by joining with 
the target table;
+    // for a Hudi table with record key, we use the source table and rely on 
Hudi's tagging
+    // to identify inserts, updates, and deletes to avoid the join
+    val inputPlan = if (!hasPrimaryKey()) {
+      // We want to join the source and target tables.
+      // Then we want to project the output so that we have the meta columns 
from the target table
+      // followed by the data columns of the source table
+      val tableMetaCols = mergeInto.targetTable.output.filter(a => 
isMetaField(a.name))
+      val joinData = 
sparkAdapter.getCatalystPlanUtils.createMITJoin(mergeInto.sourceTable, 
mergeInto.targetTable, LeftOuter, Some(mergeInto.mergeCondition), "NONE")
+      val incomingDataCols = 
joinData.output.filterNot(mergeInto.targetTable.outputSet.contains)
       Project(tableMetaCols ++ incomingDataCols, joinData)
     } else {
-      Project(incomingDataCols, joinData)
+      mergeInto.sourceTable
     }
 
-    val projectedJoinOutput = projectedJoinPlan.output
+    val inputPlanAttributes = inputPlan.output
 
     val requiredAttributesMap = recordKeyAttributeToConditionExpression ++ 
preCombineAttributeAssociatedExpression
 
     val (existingAttributesMap, missingAttributesMap) = 
requiredAttributesMap.partition {
-      case (keyAttr, _) => projectedJoinOutput.exists(attr => 
resolver(keyAttr.name, attr.name))
+      case (keyAttr, _) => inputPlanAttributes.exists(attr => 
resolver(keyAttr.name, attr.name))
     }
 
     // This is to handle the situation where condition is something like 
"s0.s_id = t0.id" so In the source table
@@ -362,7 +366,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
     //       them according to aforementioned heuristic) to meet Hudi's 
requirements
     val additionalColumns: Seq[NamedExpression] =
       missingAttributesMap.flatMap {
-        case (keyAttr, sourceExpression) if !projectedJoinOutput.exists(attr 
=> resolver(attr.name, keyAttr.name)) =>
+        case (keyAttr, sourceExpression) if !inputPlanAttributes.exists(attr 
=> resolver(attr.name, keyAttr.name)) =>
           Seq(Alias(sourceExpression, keyAttr.name)())
 
         case _ => Seq()
@@ -372,7 +376,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
     // matches to that one of the target table. This is necessary b/c unlike 
Spark, Avro is case-sensitive
     // and therefore would fail downstream if case of corresponding columns 
don't match
     val existingAttributes = existingAttributesMap.map(_._1)
-    val adjustedSourceTableOutput = projectedJoinOutput.map { attr =>
+    val adjustedSourceTableOutput = inputPlanAttributes.map { attr =>
       existingAttributes.find(keyAttr => resolver(keyAttr.name, attr.name)) 
match {
         // To align the casing we just rename the attribute to match that one 
of the
         // target table
@@ -381,7 +385,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
       }
     }
 
-    val amendedPlan = Project(adjustedSourceTableOutput ++ additionalColumns, 
projectedJoinPlan)
+    val amendedPlan = Project(adjustedSourceTableOutput ++ additionalColumns, 
inputPlan)
 
     Dataset.ofRows(sparkSession, amendedPlan)
   }
@@ -575,7 +579,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
     // NOTE: We're relying on [[sourceDataset]] here instead of 
[[mergeInto.sourceTable]],
     //       as it could be amended to add missing primary-key and/or 
pre-combine columns.
     //       Please check [[sourceDataset]] scala-doc for more details
-    (projectedJoinedDataset.queryExecution.analyzed.output ++ 
mergeInto.targetTable.output).filterNot(a => isMetaField(a.name))
+    (getProcessedInputDf.queryExecution.analyzed.output ++ 
mergeInto.targetTable.output).filterNot(a => isMetaField(a.name))
   }
 
   private def resolvesToSourceAttribute(expr: Expression): Boolean = {
@@ -618,9 +622,8 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
     val preCombineField = hoodieCatalogTable.preCombineKey.getOrElse("")
     val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, 
tableConfig)
     // for pkless tables, we need to enable optimized merge
-    val hasPrimaryKey = tableConfig.getRecordKeyFields.isPresent
-    val enableOptimizedMerge = if (!hasPrimaryKey) "true" else 
sparkSession.sqlContext.conf.getConfString(SPARK_SQL_OPTIMIZED_WRITES.key(), 
"false")
-    val keyGeneratorClassName = if (enableOptimizedMerge == "true") {
+    val isPrimaryKeylessTable = !hasPrimaryKey()
+    val keyGeneratorClassName = if (isPrimaryKeylessTable) {
       classOf[MergeIntoKeyGenerator].getCanonicalName
     } else {
       classOf[SqlKeyGenerator].getCanonicalName
@@ -658,7 +661,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
       CANONICALIZE_SCHEMA.key -> "false",
       SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key -> "true",
       HoodieSparkSqlWriter.SQL_MERGE_INTO_WRITES.key -> "true",
-      HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY -> 
enableOptimizedMerge,
+      HoodieWriteConfig.SPARK_SQL_MERGE_INTO_PREPPED_KEY -> 
isPrimaryKeylessTable.toString,
       HoodieWriteConfig.COMBINE_BEFORE_UPSERT.key() -> 
(!StringUtils.isNullOrEmpty(preCombineField)).toString
     )
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
index bc2a169779c5..2e06da2aee51 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
@@ -17,25 +17,33 @@
 
 package org.apache.spark.sql.hudi
 
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hudi.HoodieSparkRecordMerger
 import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.config.HoodieStorageConfig
-import org.apache.hudi.common.model.HoodieAvroRecordMerger
+import org.apache.hudi.common.config.{HoodieCommonConfig, 
HoodieMetadataConfig, HoodieStorageConfig}
+import org.apache.hudi.common.engine.HoodieLocalEngineContext
+import org.apache.hudi.common.model.{HoodieAvroRecordMerger, HoodieRecord}
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils
+import org.apache.hudi.common.table.view.{FileSystemViewManager, 
FileSystemViewStorageConfig, SyncableFileSystemView}
+import org.apache.hudi.common.testutils.HoodieTestUtils
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.ExceptionUtil.getRootCause
 import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex
+import org.apache.hudi.metadata.HoodieTableMetadata
 import org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTest
+import org.apache.spark
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase.checkMessageContains
+import org.apache.spark.sql.types.StructField
 import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.util.Utils
 import org.joda.time.DateTimeZone
 import org.scalactic.source
+import org.scalatest.Assertions.assertResult
 import org.scalatest.{BeforeAndAfterAll, FunSuite, Tag}
 
 import java.io.File
@@ -228,6 +236,13 @@ class HoodieSparkSqlTestBase extends FunSuite with 
BeforeAndAfterAll {
       HoodieRecordType.AVRO
     }
   }
+
+  protected def validateTableSchema(tableName: String,
+                                    expectedStructFields: List[StructField]): 
Unit = {
+    assertResult(expectedStructFields)(
+      spark.sql(s"select * from $tableName").schema.fields
+        .filter(e => 
!HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(e.name)))
+  }
 }
 
 object HoodieSparkSqlTestBase {
@@ -255,4 +270,34 @@ object HoodieSparkSqlTestBase {
   private def checkMessageContains(e: Throwable, text: String): Boolean =
     e.getMessage.trim.contains(text.trim)
 
+  def getMetaClientAndFileSystemView(basePath: String):
+  (HoodieTableMetaClient, SyncableFileSystemView) = {
+    val storageConf = new Configuration()
+    val metaClient: HoodieTableMetaClient =
+      
HoodieTableMetaClient.builder.setConf(storageConf).setBasePath(basePath).build
+    val metadataConfig = HoodieMetadataConfig.newBuilder.build
+    val engineContext = new HoodieLocalEngineContext(storageConf)
+    val viewManager: FileSystemViewManager = 
FileSystemViewManager.createViewManagerWithTableMetadata(
+      engineContext,
+      metadataConfig,
+      FileSystemViewStorageConfig.newBuilder.build,
+      HoodieCommonConfig.newBuilder.build
+    )
+    val fsView: SyncableFileSystemView = 
viewManager.getFileSystemView(metaClient)
+    (metaClient, fsView)
+  }
+
+  /**
+   * Replaces the existing file with an empty file which is meant to be 
corrupted
+   * in a Hudi table.
+   *
+   * @param storage  [[HoodieStorage]] instance
+   * @param filePath file path
+   */
+  def replaceWithEmptyFile(filePath: Path): Unit = {
+    val conf = new Configuration
+    val fs = FileSystem.get(conf)
+    fs.delete(filePath, true)
+    fs.create(filePath, true)
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDescribeTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDescribeTable.scala
new file mode 100644
index 000000000000..76c8f7b80691
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDescribeTable.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.ddl
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
+
+import java.util.function.Predicate
+
+class TestDescribeTable extends HoodieSparkSqlTestBase {
+
+  test("Test desc hudi table command") {
+    withTempDir { tmp =>
+      val tbName = "wk_date"
+      val basePath = s"${tmp.getCanonicalPath}/$tbName"
+
+      spark.sql(
+        s"""
+           |create table $tbName (id int, driver string, precomb int, dat 
string)
+           | using hudi
+           | partitioned by(dat)
+           | tblproperties(
+           |   type='cow',
+           |   primaryKey='id',
+           |   preCombineField='precomb'
+           | )
+           | location '$basePath'
+       """.stripMargin)
+
+      // just for scala-2.11 compatibility
+      val locationInFirstColumn: Predicate[Row] = new Predicate[Row] {
+        def test(row: Row): Boolean = row(0).equals("Location")
+      }
+
+      spark.sql("set hoodie.schema.on.read.enable=false")
+      var output: java.util.List[Row] = spark.sql(s"describe extended 
$tbName").collectAsList()
+      assert(output.stream().anyMatch(locationInFirstColumn))
+
+      spark.sql("set hoodie.schema.on.read.enable=true")
+      output = spark.sql(s"desc formatted $tbName").collectAsList()
+      assert(output.stream().anyMatch(locationInFirstColumn))
+
+      output = spark.sql(s"describe table extended $tbName").collectAsList()
+      assert(output.stream().anyMatch(locationInFirstColumn))
+
+      // DESC returns only columns and partitions when run without 'extended' 
or 'formatted' keywords
+      output = spark.sql(s"describe table $tbName").collectAsList()
+      assert(output.stream().noneMatch(locationInFirstColumn))
+
+      output = spark.sql(s"desc table $tbName").collectAsList()
+      assert(output.stream().noneMatch(locationInFirstColumn))
+
+      output = spark.sql(s"desc $tbName").collectAsList()
+      assert(output.stream().noneMatch(locationInFirstColumn))
+    }
+  }
+
+  test("Test desc non-hudi table command") {
+    withTempDir { tmp =>
+      val tbName = "wk_date"
+      val basePath = s"${tmp.getCanonicalPath}/$tbName"
+
+      spark.sql(
+        s"""
+           |create table $tbName (
+           | id int,
+           | driver string,
+           | precomb int,
+           | dat string
+           |)
+           | using parquet
+           | location '$basePath'
+       """.stripMargin)
+
+      // just for scala-2.11 compatibility
+      val locationInFirstColumn: Predicate[Row] = new Predicate[Row] {
+        def test(row: Row): Boolean = row(0).equals("Location")
+      }
+
+      spark.sql("set hoodie.schema.on.read.enable=false")
+      var output: java.util.List[Row] = spark.sql(s"describe extended 
$tbName").collectAsList()
+      assert(output.stream().anyMatch(locationInFirstColumn))
+
+      spark.sql("set hoodie.schema.on.read.enable=true")
+      output = spark.sql(s"desc formatted $tbName").collectAsList()
+      assert(output.stream().anyMatch(locationInFirstColumn))
+
+      output = spark.sql(s"describe table extended $tbName").collectAsList()
+      assert(output.stream().anyMatch(locationInFirstColumn))
+
+      // DESC returns only columns and partitions when run without 'extended' 
or 'formatted' keywords
+      output = spark.sql(s"describe table $tbName").collectAsList()
+      assert(output.stream().noneMatch(locationInFirstColumn))
+
+      output = spark.sql(s"desc table $tbName").collectAsList()
+      assert(output.stream().noneMatch(locationInFirstColumn))
+
+      output = spark.sql(s"desc $tbName").collectAsList()
+      assert(output.stream().noneMatch(locationInFirstColumn))
+    }
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
index 80ee86ee6f21..76a1d540c924 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
@@ -19,8 +19,11 @@ package org.apache.spark.sql.hudi
 
 import org.apache.hudi.DataSourceWriteOptions.SPARK_SQL_OPTIMIZED_WRITES
 import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.{FileSlice, HoodieLogFile}
 import org.apache.hudi.{DataSourceReadOptions, HoodieDataSourceHelpers, 
HoodieSparkUtils, ScalaAssertionSupport}
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, 
StructField}
+import org.junit.jupiter.api.Assertions.assertTrue
 
 class TestMergeIntoTable extends HoodieSparkSqlTestBase with 
ScalaAssertionSupport {
 
@@ -28,6 +31,8 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with 
ScalaAssertionSuppo
     Seq(true, false).foreach { sparkSqlOptimizedWrites =>
       withRecordType()(withTempDir { tmp =>
         spark.sql("set hoodie.payload.combined.schema.validate = false")
+        spark.sql("set hoodie.metadata.index.column.stats.enable = false")
+        spark.sql("set hoodie.metadata.index.partition.stats.enable = false")
         val tableName = generateTableName
         // Create table
         spark.sql(
@@ -36,8 +41,10 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with 
ScalaAssertionSuppo
              |  id int,
              |  name string,
              |  price double,
-             |  ts long
+             |  ts int,
+             |  partition string
              |) using hudi
+             | partitioned by (partition)
              | location '${tmp.getCanonicalPath}'
              | tblproperties (
              |  primaryKey ='id',
@@ -45,76 +52,113 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
              | )
        """.stripMargin)
 
-        // test with optimized sql merge enabled / disabled.
-        spark.sql(s"set 
${SPARK_SQL_OPTIMIZED_WRITES.key()}=$sparkSqlOptimizedWrites")
+          // test with optimized sql merge enabled / disabled.
+          spark.sql(s"set 
${SPARK_SQL_OPTIMIZED_WRITES.key()}=$sparkSqlOptimizedWrites")
 
-        // First merge with a extra input field 'flag' (insert a new record)
-        spark.sql(
-          s"""
-             | merge into $tableName
-             | using (
-             |  select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '1' as 
flag
-             | ) s0
-             | on s0.id = $tableName.id
-             | when matched and flag = '1' then update set
-             | id = s0.id, name = s0.name, price = s0.price, ts = s0.ts
-             | when not matched and flag = '1' then insert *
+          val structFields = List(
+            StructField("id", IntegerType, nullable = true),
+            StructField("name", StringType, nullable = true),
+            StructField("price", DoubleType, nullable = true),
+            StructField("ts", IntegerType, nullable = true),
+            StructField("partition", StringType, nullable = true))
+          // First merge with a extra input field 'flag' (insert a new record)
+          spark.sql(
+            s"""
+               | merge into $tableName
+               | using (
+               |  select 1 as id, 'a1' as name, 10 as price, 1000 as ts, 'p1' 
as partition, '1' as flag
+               |  union
+               |  select 2 as id, 'a2' as name, 20 as price, 1000 as ts, 'p2' 
as partition, '1' as flag
+               |  union
+               |  select 3 as id, 'a3' as name, 30 as price, 1000 as ts, 'p3' 
as partition, '1' as flag
+               | ) s0
+               | on s0.id = $tableName.id
+               | when matched and flag = '1' then update set
+               | id = s0.id, name = s0.name, price = s0.price, ts = s0.ts
+               | when not matched and flag = '1' then insert *
        """.stripMargin)
-        checkAnswer(s"select id, name, price, ts from $tableName")(
-          Seq(1, "a1", 10.0, 1000)
-        )
+          validateTableSchema(tableName, structFields)
+          checkAnswer(s"select id, name, price, ts, partition from 
$tableName")(
+            Seq(1, "a1", 10.0, 1000, "p1"),
+            Seq(2, "a2", 20.0, 1000, "p2"),
+            Seq(3, "a3", 30.0, 1000, "p3")
+          )
 
-        // Second merge (update the record)
-        spark.sql(
-          s"""
-             | merge into $tableName
-             | using (
-             |  select 1 as id, 'a1' as name, 10 as price, 1001 as ts
-             | ) s0
-             | on s0.id = $tableName.id
-             | when matched then update set
-             | id = s0.id, name = s0.name, price = s0.price + 
$tableName.price, ts = s0.ts
-             | when not matched then insert *
-       """.stripMargin)
-        checkAnswer(s"select id, name, price, ts from $tableName")(
-          Seq(1, "a1", 20.0, 1001)
-        )
+          // Second merge (update the record) with different field names in 
the source
+          spark.sql(
+            s"""
+               | merge into $tableName
+               | using (
+               |  select 1 as _id, 'a1' as name, 10 as _price, 1001 as _ts, 
'p1' as partition
+               | ) s0
+               | on s0._id = $tableName.id
+               | when matched then update set
+               | id = s0._id, name = s0.name, price = s0._price + 
$tableName.price, ts = s0._ts
+               | """.stripMargin)
+          validateTableSchema(tableName, structFields)
+          checkAnswer(s"select id, name, price, ts, partition from 
$tableName")(
+            Seq(1, "a1", 20.0, 1001, "p1"),
+            Seq(2, "a2", 20.0, 1000, "p2"),
+            Seq(3, "a3", 30.0, 1000, "p3")
+          )
 
-        // the third time merge (update & insert the record)
-        spark.sql(
-          s"""
-             | merge into $tableName
-             | using (
-             |  select * from (
-             |  select 1 as id, 'a1' as name, 10 as price, 1002 as ts
-             |  union all
-             |  select 2 as id, 'a2' as name, 12 as price, 1001 as ts
-             |  )
-             | ) s0
-             | on s0.id = $tableName.id
-             | when matched then update set
-             | id = s0.id, name = s0.name, price = s0.price + 
$tableName.price, ts = s0.ts
-             | when not matched and s0.id % 2 = 0 then insert *
+          // the third time merge (update & insert the record)
+          spark.sql(
+            s"""
+               | merge into $tableName
+               | using (
+               |  select * from (
+               |  select 1 as id, 'a1' as name, 10 as price, 1002 as ts, 'p1' 
as partition
+               |  union all
+               |  select 4 as id, 'a4' as name, 40 as price, 1001 as ts, 'p4' 
as partition
+               |  )
+               | ) s0
+               | on s0.id = $tableName.id
+               | when matched then update set
+               | id = s0.id, name = s0.name, price = s0.price + 
$tableName.price, ts = s0.ts
+               | when not matched and s0.id % 2 = 0 then insert *
        """.stripMargin)
-        checkAnswer(s"select id, name, price, ts from $tableName")(
-          Seq(1, "a1", 30.0, 1002),
-          Seq(2, "a2", 12.0, 1001)
+        validateTableSchema(tableName, structFields)
+        checkAnswer(s"select id, name, price, ts, partition from $tableName")(
+          Seq(1, "a1", 30.0, 1002, "p1"),
+          Seq(2, "a2", 20.0, 1000, "p2"),
+          Seq(3, "a3", 30.0, 1000, "p3"),
+          Seq(4, "a4", 40.0, 1001, "p4")
         )
 
+        // Validate that MERGE INTO only scan affected partitions in the 
target table
+        // Corrupt the files in other partitions not receiving updates
+        val (metaClient, fsv) = 
HoodieSparkSqlTestBase.getMetaClientAndFileSystemView(tmp.getCanonicalPath)
+        Seq("p2", "p3", "p4").map(e => "partition=" + e).foreach(partition => {
+          assertTrue(fsv.getLatestFileSlices(partition).count() > 0)
+          fsv.getLatestFileSlices(partition).forEach(new 
java.util.function.Consumer[FileSlice] {
+            override def accept(fileSlice: FileSlice): Unit = {
+              if (fileSlice.getBaseFile.isPresent) {
+                HoodieSparkSqlTestBase.replaceWithEmptyFile(
+                  fileSlice.getBaseFile.get.getHadoopPath)
+              }
+              fileSlice.getLogFiles.forEach(new 
java.util.function.Consumer[HoodieLogFile] {
+                override def accept(logFile: HoodieLogFile): Unit = {
+                  HoodieSparkSqlTestBase.replaceWithEmptyFile(logFile.getPath)
+                }
+              })
+            }
+          })
+        })
         // the fourth merge (delete the record)
         spark.sql(
           s"""
              | merge into $tableName
              | using (
-             |  select 1 as id, 'a1' as name, 12 as price, 1003 as ts
+             |  select 1 as id, 'a1' as name, 12 as price, 1003 as ts, 'p1' as 
partition
              | ) s0
              | on s0.id = $tableName.id
              | when matched and s0.id != 1 then update set
              |    id = s0.id, name = s0.name, price = s0.price, ts = s0.ts
              | when matched and s0.id = 1 then delete
              | when not matched then insert *
-       """.stripMargin)
-        val cnt = spark.sql(s"select * from $tableName where id = 1").count()
+             | """.stripMargin)
+        val cnt = spark.sql(s"select * from $tableName where partition = 
'p1'").count()
         assertResult(0)(cnt)
       })
     }
diff --git 
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala
 
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala
index d64bc94301a1..7917f93de539 100644
--- 
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala
@@ -17,11 +17,12 @@
 
 package org.apache.spark.sql.hudi.analysis
 
-import org.apache.hadoop.fs.Path
 import org.apache.hudi.{DataSourceReadOptions, DefaultSource, 
SparkAdapterSupport}
+
+import org.apache.hadoop.fs.Path
 import org.apache.spark.sql.HoodieSpark3CatalystPlanUtils.MatchResolvedTable
 import 
org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer.resolveExpressionByPlanChildren
-import org.apache.spark.sql.catalyst.analysis.{AnalysisErrorAt, 
EliminateSubqueryAliases, NamedRelation, UnresolvedAttribute, 
UnresolvedPartitionSpec}
+import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, 
NamedRelation, UnresolvedAttribute, UnresolvedPartitionSpec}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils}
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.plans.logcal.{HoodieQuery, 
HoodieTableChanges, HoodieTableChangesOptionsParser}
@@ -30,6 +31,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
 import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
 import org.apache.spark.sql.connector.catalog.{Table, V1Table}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.execution.command.DescribeTableCommand
 import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation}
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isMetaField
 import org.apache.spark.sql.hudi.ProvidesHoodieConfig
@@ -280,6 +282,9 @@ case class HoodieSpark32PlusPostAnalysisRule(sparkSession: 
SparkSession) extends
           retainData = true
         )
 
+      case DescribeRelation(MatchResolvedTable(_, id, HoodieV1OrV2Table(_)), 
partitionSpec, isExtended, output) =>
+        DescribeTableCommand(id.asTableIdentifier, partitionSpec, isExtended, 
output)
+
       case _ => plan
     }
   }
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
index 7d9b2afa33ce..fbb9dac49677 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
@@ -319,6 +319,7 @@ public class HiveSyncTool extends HoodieSyncTool implements 
AutoCloseable {
       // Sync the partitions if needed
       // find dropped partitions, if any, in the latest commit
       Set<String> droppedPartitions = 
syncClient.getDroppedPartitionsSince(lastCommitTimeSynced, 
lastCommitCompletionTimeSynced);
+      LOG.info("Partitions dropped since last sync: {}", 
droppedPartitions.size());
       partitionsChanged = syncPartitions(tableName, writtenPartitionsSince, 
droppedPartitions);
     }
     return partitionsChanged;
diff --git 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index 5bd4838a62f5..732a15845c89 100644
--- 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++ 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -1281,6 +1281,129 @@ public class TestHiveSyncTool {
         "The last commit that was synced should be updated in the 
TBLPROPERTIES");
   }
 
+  @ParameterizedTest
+  @MethodSource("syncModeAndEnablePushDown")
+  void testGetPartitionEvents_droppedStoragePartitionNotPresentInMetastore(
+      String syncMode, String enablePushDown) throws Exception {
+    hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), syncMode);
+    hiveSyncProps.setProperty(HIVE_SYNC_FILTER_PUSHDOWN_ENABLED.key(), 
enablePushDown);
+
+    // Create a table with 1 partition
+    String instantTime1 = "100";
+    HiveTestUtil.createCOWTable(instantTime1, 1, true);
+
+    reInitHiveSyncClient();
+    // Sync the table to metastore
+    reSyncHiveTable();
+    
+    List<Partition> partitionsInMetastore = 
hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
+    assertEquals(1, partitionsInMetastore.size(), "Should have 1 partition in 
metastore");
+
+    // Add a partition to storage but don't sync it to metastore
+    String instantTime2 = "101";
+    String newPartition = "2010/02/01";
+    HiveTestUtil.addCOWPartition(newPartition, true, true, instantTime2);
+    
+    // Verify the partition is not in metastore yet
+    partitionsInMetastore = 
hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
+    assertEquals(1, partitionsInMetastore.size(), "Should have 1 partition in 
metastore");
+
+    // Delete the partition that was never synced to metastore
+    String instantTime3 = "102";
+    HiveTestUtil.createReplaceCommit(instantTime3, newPartition, 
WriteOperationType.DELETE_PARTITION, true, true);
+    
+    // Add another partition to storage but don't sync to metastore
+    String instantTime4 = "103";
+    String addPartition = "2010/04/01";
+    HiveTestUtil.addCOWPartition(addPartition, true, true, instantTime4);
+
+    reInitHiveSyncClient();
+
+    Set<String> droppedPartitionsOnStorage = 
hiveClient.getDroppedPartitionsSince(Option.of(instantTime1), 
Option.of(instantTime1));
+    List<String> writtenPartitionsOnStorage = 
hiveClient.getWrittenPartitionsSince(Option.of(instantTime1), 
Option.of(instantTime1));
+    List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(
+        partitionsInMetastore, writtenPartitionsOnStorage, 
droppedPartitionsOnStorage);
+    
+    // Verify no DROP event is generated for partition that was never in 
metastore
+    long dropEvents = partitionEvents.stream()
+        .filter(e -> e.eventType == PartitionEventType.DROP)
+        .count();
+    assertEquals(0, dropEvents,
+        "No DROP partition event should be generated for partition that was 
never in metastore");
+    
+    // Verify ADD event is generated for the new partition that was added to 
storage
+    List<PartitionEvent> addEvents = partitionEvents.stream()
+        .filter(e -> e.eventType == PartitionEventType.ADD)
+        .collect(Collectors.toList());
+    assertEquals(1, addEvents.size(),
+        "ADD partition event should be generated for new partition added to 
storage");
+    assertEquals(addPartition, addEvents.get(0).storagePartition);
+  }
+
+  @ParameterizedTest
+  @MethodSource("syncModeAndEnablePushDown")
+  void testGetPartitionEvents_droppedStoragePartitionPresentInMetastore(
+      String syncMode, String enablePushDown) throws Exception {
+    hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), syncMode);
+    hiveSyncProps.setProperty(HIVE_SYNC_FILTER_PUSHDOWN_ENABLED.key(), 
enablePushDown);
+
+    // Create a table with 1 partition
+    String instantTime1 = "100";
+    HiveTestUtil.createCOWTable(instantTime1, 1, true);
+
+    reInitHiveSyncClient();
+    // Sync the table to metastore
+    reSyncHiveTable();
+    
+    List<Partition> partitionsInMetastore = 
hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
+    assertEquals(1, partitionsInMetastore.size(), "Should have 1 partition in 
metastore");
+
+    // Add a partition and sync it to metastore
+    String instantTime2 = "101";
+    String newPartition = "2010/02/01";
+    HiveTestUtil.addCOWPartition(newPartition, true, true, instantTime2);
+
+    reInitHiveSyncClient();
+    // Sync the table to metastore
+    reSyncHiveTable();
+    
+    partitionsInMetastore = 
hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
+    assertEquals(2, partitionsInMetastore.size(), "Should have 2 partitions in 
metastore");
+    
+    // Now delete the partition that exists in metastore
+    String instantTime3 = "102";
+    HiveTestUtil.createReplaceCommit(instantTime3, newPartition, 
WriteOperationType.DELETE_PARTITION, true, true);
+    
+    // Add another partition to storage but don't sync to metastore
+    String instantTime4 = "103";
+    String addPartition = "2010/04/01";
+    HiveTestUtil.addCOWPartition(addPartition, true, true, instantTime4);
+    
+    reInitHiveSyncClient();
+    
+    // Get partition events
+    Set<String> droppedPartitionsOnStorage = 
hiveClient.getDroppedPartitionsSince(Option.of(instantTime2), 
Option.of(instantTime2));
+    List<String> writtenPartitionsOnStorage = 
hiveClient.getWrittenPartitionsSince(Option.of(instantTime2), 
Option.of(instantTime2));
+    List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(
+        partitionsInMetastore, writtenPartitionsOnStorage, 
droppedPartitionsOnStorage);
+    
+    // Verify DROP event is generated for partition that exists in metastore
+    List<PartitionEvent> dropEvents = partitionEvents.stream()
+        .filter(e -> e.eventType == PartitionEventType.DROP)
+        .collect(Collectors.toList());
+    assertEquals(1, dropEvents.size(),
+        "DROP partition event should be generated for partition that exists in 
metastore");
+    assertEquals(newPartition, dropEvents.get(0).storagePartition);
+    
+    // Verify ADD event is generated for the new partition that was added to 
storage
+    List<PartitionEvent> addEvents = partitionEvents.stream()
+        .filter(e -> e.eventType == PartitionEventType.ADD)
+        .collect(Collectors.toList());
+    assertEquals(1, addEvents.size(),
+        "ADD partition event should be generated for new partition added to 
storage");
+    assertEquals(addPartition, addEvents.get(0).storagePartition);
+  }
+
   @ParameterizedTest
   @MethodSource("syncModeAndEnablePushDown")
   public void testNonPartitionedSync(String syncMode, String enablePushDown) 
throws Exception {
diff --git 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
index 6ec35c435a05..6f0faca58fa1 100644
--- 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
+++ 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
@@ -213,11 +213,14 @@ public abstract class HoodieSyncClient implements 
HoodieMetaSyncOperations, Auto
       // Check if the partition values or if hdfs path is the same
       List<String> storagePartitionValues = 
partitionValueExtractor.extractPartitionValuesInPath(storagePartition);
 
-      if (droppedPartitionsOnStorage.contains(storagePartition)) {
-        events.add(PartitionEvent.newPartitionDropEvent(storagePartition));
-      } else {
-        if (!storagePartitionValues.isEmpty()) {
-          String storageValue = String.join(", ", storagePartitionValues);
+      if (!storagePartitionValues.isEmpty()) {
+        String storageValue = String.join(", ", storagePartitionValues);
+        if (droppedPartitionsOnStorage.contains(storagePartition)) {
+          if (paths.containsKey(storageValue)) {
+            // Add partition drop event only if it exists in the metastore
+            events.add(PartitionEvent.newPartitionDropEvent(storagePartition));
+          }
+        } else {
           if (!paths.containsKey(storageValue)) {
             events.add(PartitionEvent.newPartitionAddEvent(storagePartition));
           } else if 
(!paths.get(storageValue).equals(fullStoragePartitionPath)) {
diff --git a/packaging/bundle-validation/ci_run.sh 
b/packaging/bundle-validation/ci_run.sh
index 505ee9c7c2d4..599c9b0b019d 100755
--- a/packaging/bundle-validation/ci_run.sh
+++ b/packaging/bundle-validation/ci_run.sh
@@ -180,6 +180,12 @@ fi
 
 ls -l $TMP_JARS_DIR
 
+# Fail early if Spark bundle is missing (validate.sh requires it)
+if [ -z "$(ls $TMP_JARS_DIR/hudi-spark*.jar 2>/dev/null)" ]; then
+  echo "Error: Hudi Spark bundle jar not found in $TMP_JARS_DIR. Ensure the 
build produces packaging/hudi-spark-bundle/target/hudi-*-$HUDI_VERSION.jar and 
ci_run.sh copies it."
+  exit 1
+fi
+
 # Copy test dataset
 TMP_DATA_DIR=/tmp/data/$(date +%s)
 mkdir -p $TMP_DATA_DIR/stocks/data
diff --git a/packaging/bundle-validation/validate.sh 
b/packaging/bundle-validation/validate.sh
index 75d4227c74a3..eefeee2a1ce1 100755
--- a/packaging/bundle-validation/validate.sh
+++ b/packaging/bundle-validation/validate.sh
@@ -38,6 +38,13 @@ ln -sf $JARS_DIR/hudi-utilities-slim*.jar 
$JARS_DIR/utilities-slim.jar
 ln -sf $JARS_DIR/hudi-kafka-connect-bundle*.jar $JARS_DIR/kafka-connect.jar
 ln -sf $JARS_DIR/hudi-metaserver-server-bundle*.jar $JARS_DIR/metaserver.jar
 
+# Resolve spark bundle jar to actual file (symlink may be broken if no 
hudi-spark*.jar was mounted)
+SPARK_JAR=$(ls $JARS_DIR/hudi-spark*.jar 2>/dev/null | head -1)
+if [ -z "$SPARK_JAR" ] || [ ! -f "$SPARK_JAR" ]; then
+  echo "::error::validate.sh Hudi Spark bundle jar not found in $JARS_DIR (no 
hudi-spark*.jar)"
+  exit 1
+fi
+
 ##
 # Function to change Java runtime version by changing JAVA_HOME
 ##
@@ -76,15 +83,17 @@ test_spark_hadoop_mr_bundles () {
     local HIVE_PID=$!
     change_java_runtime_version
     echo "::warning::validate.sh Writing sample data via Spark DataSource and 
run Hive Sync..."
-    $SPARK_HOME/bin/spark-shell --jars $JARS_DIR/spark.jar < 
$WORKDIR/spark_hadoop_mr/write.scala
+    # Use resolved SPARK_JAR and driver classpath so HoodieCatalog 
(spark-defaults.conf for Spark 3.2/3.3) is visible
+    $SPARK_HOME/bin/spark-shell --jars "$SPARK_JAR" --conf 
"spark.driver.extraClassPath=$SPARK_JAR" < $WORKDIR/spark_hadoop_mr/write.scala
 
     echo "::warning::validate.sh Query and validate the results using Spark 
SQL"
     # save Spark SQL query results
-    $SPARK_HOME/bin/spark-shell --jars $JARS_DIR/spark.jar \
+    $SPARK_HOME/bin/spark-shell --jars "$SPARK_JAR" --conf 
"spark.driver.extraClassPath=$SPARK_JAR" \
       -i <(echo 'spark.sql("select * from 
trips").coalesce(1).write.csv("/tmp/spark-bundle/sparksql/trips/results"); 
System.exit(0)')
-    numRecords=$(cat /tmp/spark-bundle/sparksql/trips/results/*.csv | wc -l)
+    numRecords=$(cat /tmp/spark-bundle/sparksql/trips/results/*.csv 
2>/dev/null | wc -l)
+    numRecords=${numRecords:-0}
     if [ "$numRecords" -ne 10 ]; then
-        echo "::error::validate.sh Spark SQL validation failed."
+        echo "::error::validate.sh Spark SQL validation failed (expected 10 
records, got $numRecords)."
         exit 1
     fi
     echo "::warning::validate.sh Query and validate the results using HiveQL"
@@ -246,13 +255,14 @@ test_metaserver_bundle () {
 
     change_java_runtime_version
     echo "::warning::validate.sh Writing sample data via Spark DataSource."
-    $SPARK_HOME/bin/spark-shell --jars $JARS_DIR/spark.jar < 
$WORKDIR/service/write.scala
+    $SPARK_HOME/bin/spark-shell --jars "$SPARK_JAR" < 
$WORKDIR/service/write.scala
     ls /tmp/hudi-bundles/tests/trips
 
     echo "::warning::validate.sh Query and validate the results using Spark 
DataSource"
     # save Spark DataSource query results
-    $SPARK_HOME/bin/spark-shell --jars $JARS_DIR/spark.jar  < 
$WORKDIR/service/read.scala
-    numRecords=$(cat 
/tmp/metaserver-bundle/sparkdatasource/trips/results/*.csv | wc -l)
+    $SPARK_HOME/bin/spark-shell --jars "$SPARK_JAR" < 
$WORKDIR/service/read.scala
+    numRecords=$(cat 
/tmp/metaserver-bundle/sparkdatasource/trips/results/*.csv 2>/dev/null | wc -l)
+    numRecords=${numRecords:-0}
     echo $numRecords
     use_default_java_runtime
     if [ "$numRecords" -ne 10 ]; then
@@ -289,7 +299,7 @@ else
 fi
 
 echo "::warning::validate.sh validating utilities slim bundle"
-test_utilities_bundle $JARS_DIR/utilities-slim.jar $JARS_DIR/spark.jar
+test_utilities_bundle $JARS_DIR/utilities-slim.jar $SPARK_JAR
 if [ "$?" -ne 0 ]; then
     exit 1
 fi

Reply via email to