This is an automated email from the ASF dual-hosted git repository.

haridsv pushed a commit to branch 5.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/5.3 by this push:
     new 878910febf PHOENIX-7799 Coalesce splits by region server to avoid 
hotspotting from concurrent mappers (#2411) (#2430)
878910febf is described below

commit 878910febf9f8cd3b39b0d375d55434812c3df05
Author: Rahul Kumar <[email protected]>
AuthorDate: Wed Apr 29 19:06:54 2026 +0530

    PHOENIX-7799 Coalesce splits by region server to avoid hotspotting from 
concurrent mappers (#2411) (#2430)
    
    ---------
    
    Co-authored-by: Rahul Kumar <[email protected]>
---
 phoenix-core-client/pom.xml                        |   1 -
 phoenix-core-server/pom.xml                        |   1 -
 .../phoenix/mapreduce/PhoenixInputSplit.java       |  49 +++-
 .../mapreduce/PhoenixSyncTableInputFormat.java     | 141 +++++++++-
 .../phoenix/mapreduce/PhoenixSyncTableMapper.java  | 249 +++++++++++------
 .../phoenix/mapreduce/PhoenixSyncTableTool.java    |  37 ++-
 phoenix-core/pom.xml                               |   1 -
 .../phoenix/end2end/PhoenixSyncTableToolIT.java    |  45 ++-
 .../phoenix/mapreduce/PhoenixInputSplitTest.java   | 200 ++++++++++++++
 .../mapreduce/PhoenixSyncTableInputFormatTest.java | 304 +++++++++++++++++++++
 pom.xml                                            |   6 +
 11 files changed, 913 insertions(+), 121 deletions(-)

diff --git a/phoenix-core-client/pom.xml b/phoenix-core-client/pom.xml
index ea9e303132..e1ba8727f3 100644
--- a/phoenix-core-client/pom.xml
+++ b/phoenix-core-client/pom.xml
@@ -253,7 +253,6 @@
     <dependency>
       <groupId>org.bouncycastle</groupId>
       <artifactId>bcprov-jdk18on</artifactId>
-      <version>1.79</version>
     </dependency>
   </dependencies>
 
diff --git a/phoenix-core-server/pom.xml b/phoenix-core-server/pom.xml
index 76757abb65..c906adce0d 100644
--- a/phoenix-core-server/pom.xml
+++ b/phoenix-core-server/pom.xml
@@ -176,7 +176,6 @@
     <dependency>
       <groupId>org.bouncycastle</groupId>
       <artifactId>bcprov-jdk18on</artifactId>
-      <version>1.79</version>
     </dependency>
     <dependency>
       <groupId>org.xerial.snappy</groupId>
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
index c71e6ca5d1..c49ce59232 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
@@ -38,7 +38,7 @@ import 
org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
 public class PhoenixInputSplit extends InputSplit implements Writable {
 
   private List<Scan> scans;
-  private KeyRange keyRange;
+  private List<KeyRange> keyRanges;
   private String regionLocation = null;
   private long splitSize = 0;
 
@@ -68,13 +68,43 @@ public class PhoenixInputSplit extends InputSplit 
implements Writable {
     return scans;
   }
 
+  /**
+   * Returns the overall KeyRange spanning this split. For coalesced splits, 
spans from the first
+   * region's lower bound to the last region's upper bound. Computed on-demand 
from keyRanges.
+   * @return KeyRange spanning the entire split, or null if keyRanges is empty
+   */
   public KeyRange getKeyRange() {
-    return keyRange;
+    if (keyRanges == null || keyRanges.isEmpty()) {
+      return null;
+    }
+    return KeyRange.getKeyRange(keyRanges.get(0).getLowerRange(),
+      keyRanges.get(keyRanges.size() - 1).getUpperRange());
+  }
+
+  /**
+   * Returns all KeyRanges for this split. For coalesced splits, returns 
multiple KeyRanges (one per
+   * region). For non-coalesced splits, returns a single-element list.
+   * @return List of KeyRanges, never null
+   */
+  public List<KeyRange> getKeyRanges() {
+    return keyRanges;
+  }
+
+  /**
+   * Checks if this split is coalesced (contains multiple regions).
+   * @return true if split contains multiple regions
+   */
+  public boolean isCoalesced() {
+    return keyRanges.size() > 1;
   }
 
   private void init() {
-    this.keyRange =
-      KeyRange.getKeyRange(scans.get(0).getStartRow(), scans.get(scans.size() 
- 1).getStopRow());
+    // Initialize keyRanges from scans
+    this.keyRanges = Lists.newArrayListWithExpectedSize(scans.size());
+    for (Scan scan : scans) {
+      KeyRange kr = KeyRange.getKeyRange(scan.getStartRow(), 
scan.getStopRow());
+      this.keyRanges.add(kr);
+    }
   }
 
   @Override
@@ -126,7 +156,8 @@ public class PhoenixInputSplit extends InputSplit 
implements Writable {
   public int hashCode() {
     final int prime = 31;
     int result = 1;
-    result = prime * result + keyRange.hashCode();
+    KeyRange range = getKeyRange();
+    result = prime * result + (range == null ? 0 : range.hashCode());
     return result;
   }
 
@@ -142,11 +173,13 @@ public class PhoenixInputSplit extends InputSplit 
implements Writable {
       return false;
     }
     PhoenixInputSplit other = (PhoenixInputSplit) obj;
-    if (keyRange == null) {
-      if (other.keyRange != null) {
+    KeyRange thisRange = getKeyRange();
+    KeyRange otherRange = other.getKeyRange();
+    if (thisRange == null) {
+      if (otherRange != null) {
         return false;
       }
-    } else if (!keyRange.equals(other.keyRange)) {
+    } else if (!thisRange.equals(otherRange)) {
       return false;
     }
     return true;
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableInputFormat.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableInputFormat.java
index d717c3f7bb..b2dd739c0c 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableInputFormat.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableInputFormat.java
@@ -21,9 +21,13 @@ import java.io.IOException;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -31,8 +35,10 @@ import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.util.ConnectionUtil;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.KeyRange;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,6 +53,13 @@ public class PhoenixSyncTableInputFormat extends 
PhoenixInputFormat<DBWritable>
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PhoenixSyncTableInputFormat.class);
 
+  // Sentinel server name used when a region location lookup returns null or 
has a null server.
+  // This can happen transiently during a region-in-transition (RIT) event 
(e.g. a split).
+  // Splits that cannot be placed on a specific server are coalesced together 
under this key
+  // rather than failing the job, since split coalescing is an optimisation, 
not a correctness
+  // requirement.
+  static final String UNKNOWN_SERVER = "UNKNOWN_SERVER";
+
   public PhoenixSyncTableInputFormat() {
     super();
   }
@@ -94,15 +107,31 @@ public class PhoenixSyncTableInputFormat extends 
PhoenixInputFormat<DBWritable>
     } catch (SQLException e) {
       throw new RuntimeException(e);
     }
-    if (completedRegions.isEmpty()) {
-      LOGGER.info("No completed regions for table {} - processing all {} 
splits", tableName,
-        allSplits.size());
-      return allSplits;
-    }
 
     List<InputSplit> unprocessedSplits = filterCompletedSplits(allSplits, 
completedRegions);
     LOGGER.info("Found {} completed mapper regions for table {}, {} 
unprocessed splits remaining",
       completedRegions.size(), tableName, unprocessedSplits.size());
+
+    boolean enableSplitCoalescing =
+      conf.getBoolean(PhoenixSyncTableTool.PHOENIX_SYNC_TABLE_SPLIT_COALESCING,
+        PhoenixSyncTableTool.DEFAULT_PHOENIX_SYNC_TABLE_SPLIT_COALESCING);
+    LOGGER.info("Split coalescing enabled: {}, for table {}", 
enableSplitCoalescing, tableName);
+
+    if (enableSplitCoalescing && unprocessedSplits.size() > 1) {
+      try (Connection conn = ConnectionUtil.getInputConnection(conf)) {
+        PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+        byte[] physicalTableName = 
pConn.getTable(tableName).getPhysicalName().getBytes();
+        List<InputSplit> coalescedSplits =
+          coalesceSplits(unprocessedSplits, pConn.getQueryServices(), 
physicalTableName);
+        LOGGER.info("Split coalescing: {} unprocessed splits {} coalesced 
splits for table {}",
+          unprocessedSplits.size(), coalescedSplits.size(), tableName);
+        return coalescedSplits;
+      } catch (Exception e) {
+        throw new IOException(String.format("Failed to coalesce splits for 
table %s. "
+          + "Split coalescing is enabled but failed due to: %s.", tableName, 
e.getMessage()), e);
+      }
+    }
+
     return unprocessedSplits;
   }
 
@@ -133,6 +162,9 @@ public class PhoenixSyncTableInputFormat extends 
PhoenixInputFormat<DBWritable>
    */
   List<InputSplit> filterCompletedSplits(List<InputSplit> allSplits,
     List<KeyRange> completedRegions) {
+    if (completedRegions.isEmpty()) {
+      return allSplits;
+    }
     allSplits.sort((s1, s2) -> {
       PhoenixInputSplit ps1 = (PhoenixInputSplit) s1;
       PhoenixInputSplit ps2 = (PhoenixInputSplit) s2;
@@ -211,4 +243,103 @@ public class PhoenixSyncTableInputFormat extends 
PhoenixInputFormat<DBWritable>
     }
     return unprocessedSplits;
   }
+
+  /**
+   * Coalesces multiple region splits from the same RegionServer into single 
InputSplits. All
+   * regions from the same server are coalesced into one split, regardless of 
count or size. This
+   * reduces mapper count and avoids hot spotting when many concurrent mappers 
hit the same server.
+   * @param unprocessedSplits Splits remaining after filtering completed 
regions
+   * @param queryServices     ConnectionQueryServices for querying region 
locations
+   * @param physicalTableName Physical HBase table name
+   * @return Coalesced splits with all regions per server combined into one 
split
+   */
+  List<InputSplit> coalesceSplits(List<InputSplit> unprocessedSplits,
+    ConnectionQueryServices queryServices, byte[] physicalTableName)
+    throws IOException, InterruptedException, SQLException {
+    // Group splits by RegionServer location
+    Map<String, List<PhoenixInputSplit>> splitsByServer =
+      groupSplitsByServer(unprocessedSplits, queryServices, physicalTableName);
+
+    List<InputSplit> coalescedSplits = new ArrayList<>();
+
+    // For each RegionServer, create one coalesced split with ALL regions from 
that server
+    for (Map.Entry<String, List<PhoenixInputSplit>> entry : 
splitsByServer.entrySet()) {
+      String serverName = entry.getKey();
+      List<PhoenixInputSplit> serverSplits = entry.getValue();
+
+      // Sort splits by start key for sequential processing
+      serverSplits.sort((s1, s2) -> 
Bytes.compareTo(s1.getKeyRange().getLowerRange(),
+        s2.getKeyRange().getLowerRange()));
+      // Create single coalesced split with ALL regions from this server
+      coalescedSplits.add(createCoalescedSplit(serverSplits, serverName));
+    }
+
+    return coalescedSplits;
+  }
+
+  /**
+   * Groups splits by RegionServer location for locality-aware coalescing. Uses
+   * ConnectionQueryServices to determine which server hosts each region.
+   * <p>
+   * If the region location is unavailable (null location or null server 
name), which can happen
+   * transiently during a region-in-transition (RIT) event such as a split, 
the split is assigned to
+   * {@link #UNKNOWN_SERVER} rather than failing the job. Since split 
coalescing is an optimisation,
+   * a transient lookup failure should degrade gracefully, not abort the MR 
job.
+   * @param splits            List of splits to group
+   * @param queryServices     ConnectionQueryServices for querying region 
locations
+   * @param physicalTableName Physical HBase table name
+   * @return Map of server name to list of splits hosted on that server
+   */
+  private Map<String, List<PhoenixInputSplit>> 
groupSplitsByServer(List<InputSplit> splits,
+    ConnectionQueryServices queryServices, byte[] physicalTableName)
+    throws IOException, SQLException {
+    Map<String, List<PhoenixInputSplit>> splitsByServer = new HashMap<>();
+    for (InputSplit split : splits) {
+      PhoenixInputSplit pSplit = (PhoenixInputSplit) split;
+      KeyRange keyRange = pSplit.getKeyRange();
+      HRegionLocation regionLocation =
+        queryServices.getTableRegionLocation(physicalTableName, 
keyRange.getLowerRange());
+      String serverName;
+      if (regionLocation == null || regionLocation.getServerName() == null) {
+        LOGGER.warn(
+          "Could not determine region server for key: {}. "
+            + "Region may be in transition. Assigning split to {} bucket.",
+          Bytes.toStringBinary(keyRange.getLowerRange()), UNKNOWN_SERVER);
+        serverName = UNKNOWN_SERVER;
+      } else {
+        serverName = regionLocation.getServerName().getAddress().toString();
+      }
+      splitsByServer.computeIfAbsent(serverName, k -> new 
ArrayList<>()).add(pSplit);
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("Split {} assigned to server {}",
+          Bytes.toStringBinary(keyRange.getLowerRange()), serverName);
+      }
+    }
+
+    return splitsByServer;
+  }
+
+  /**
+   * Creates a coalesced PhoenixInputSplit containing multiple regions. 
Combines scans and KeyRanges
+   * from individual splits into a single split.
+   * @param splits         List of splits to coalesce (from same RegionServer)
+   * @param serverLocation RegionServer location for data locality
+   * @return Coalesced PhoenixInputSplit
+   */
+  private PhoenixInputSplit createCoalescedSplit(List<PhoenixInputSplit> 
splits,
+    String serverLocation) throws IOException, InterruptedException {
+
+    List<Scan> allScans = new ArrayList<>();
+    long totalSize = 0;
+    // Extract all scans from individual splits
+    for (PhoenixInputSplit split : splits) {
+      allScans.addAll(split.getScans());
+      totalSize += split.getLength();
+    }
+
+    LOGGER.info("Created coalesced split with {} regions, {} MB from server 
{}", splits.size(),
+      totalSize / (1024 * 1024), serverLocation);
+    // Create a new PhoenixInputSplit, keyRanges will be derived from scans in 
init()
+    return new PhoenixInputSplit(allScans, totalSize, serverLocation);
+  }
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableMapper.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableMapper.java
index acaac01fc3..65e932ae78 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableMapper.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableMapper.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
@@ -45,7 +44,6 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.util.ConnectionUtil;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
@@ -88,16 +86,13 @@ public class PhoenixSyncTableMapper
   private Connection globalConnection;
   private PTable pTable;
   private byte[] physicalTableName;
-  private byte[] mapperRegionStart;
-  private byte[] mapperRegionEnd;
+  private List<KeyRange> regionKeyRanges;
   private PhoenixSyncTableOutputRepository syncTableOutputRepository;
-  private Timestamp mapperStartTime;
 
   @Override
   protected void setup(Context context) throws InterruptedException {
     try {
       super.setup(context);
-      mapperStartTime = new Timestamp(System.currentTimeMillis());
       this.conf = context.getConfiguration();
       tableName = PhoenixSyncTableTool.getPhoenixSyncTableName(conf);
       targetZkQuorum = 
PhoenixSyncTableTool.getPhoenixSyncTableTargetZkQuorum(conf);
@@ -123,18 +118,25 @@ public class PhoenixSyncTableMapper
   }
 
   /**
-   * Extracts mapper region boundaries from the PhoenixInputSplit
+   * Extracts region key ranges from the PhoenixInputSplit. Handles both 
single-region splits and
+   * coalesced splits with multiple regions.
    */
   private void extractRegionBoundariesFromSplit(Context context) {
     PhoenixInputSplit split = (PhoenixInputSplit) context.getInputSplit();
-    KeyRange keyRange = split.getKeyRange();
-    if (keyRange == null) {
+    regionKeyRanges = split.getKeyRanges();
+
+    if (regionKeyRanges == null || regionKeyRanges.isEmpty()) {
       throw new IllegalStateException(String.format(
-        "PhoenixInputSplit has no KeyRange for table: %s . Cannot determine 
region boundaries for sync operation.",
+        "PhoenixInputSplit has no KeyRanges for table: %s. Cannot determine 
region boundaries for sync operation.",
         tableName));
     }
-    mapperRegionStart = keyRange.getLowerRange();
-    mapperRegionEnd = keyRange.getUpperRange();
+
+    if (split.isCoalesced()) {
+      LOGGER.info("Mapper processing coalesced split with {} regions for table 
{}",
+        regionKeyRanges.size(), tableName);
+    } else {
+      LOGGER.info("Mapper processing single region split for table {}", 
tableName);
+    }
   }
 
   /**
@@ -156,67 +158,23 @@ public class PhoenixSyncTableMapper
   }
 
   /**
-   * Processes a mapper region by comparing chunks between source and target 
clusters. Gets already
-   * processed chunks from checkpoint table, resumes from check pointed 
progress and records final
-   * status for chunks & mapper (VERIFIED/MISMATCHED).
+   * Processes mapper region(s) by comparing chunks between source and target 
clusters. For
+   * coalesced splits, processes each region sequentially. Gets already 
processed chunks from
+   * checkpoint table, resumes from check pointed progress and records final 
status for chunks &
+   * mapper (VERIFIED/MISMATCHED).
    */
   @Override
   protected void map(NullWritable key, DBInputFormat.NullDBWritable value, 
Context context)
     throws IOException, InterruptedException {
+    context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1);
     try {
-      List<PhoenixSyncTableCheckpointOutputRow> processedChunks =
-        syncTableOutputRepository.getProcessedChunks(tableName, 
targetZkQuorum, fromTime, toTime,
-          tenantId, mapperRegionStart, mapperRegionEnd);
-      List<KeyRange> unprocessedRanges =
-        calculateUnprocessedRanges(mapperRegionStart, mapperRegionEnd, 
processedChunks);
-      boolean isStartKeyInclusive = 
shouldStartKeyBeInclusive(mapperRegionStart, processedChunks);
-      for (KeyRange range : unprocessedRanges) {
-        processMapperRanges(range.getLowerRange(), range.getUpperRange(), 
isStartKeyInclusive,
-          context);
-        isStartKeyInclusive = false;
-      }
-
-      long chunksMismatched = 
context.getCounter(SyncCounters.CHUNKS_MISMATCHED).getValue();
-      long chunksVerified = 
context.getCounter(SyncCounters.CHUNKS_VERIFIED).getValue();
-      long sourceRowsProcessed = 
context.getCounter(SyncCounters.SOURCE_ROWS_PROCESSED).getValue();
-      long targetRowsProcessed = 
context.getCounter(SyncCounters.TARGET_ROWS_PROCESSED).getValue();
-      Timestamp mapperEndTime = new Timestamp(System.currentTimeMillis());
-      String counters = PhoenixSyncTableCheckpointOutputRow.CounterFormatter
-        .formatMapper(chunksVerified, chunksMismatched, sourceRowsProcessed, 
targetRowsProcessed);
-      if (sourceRowsProcessed > 0) {
-        if (chunksMismatched == 0) {
-          context.getCounter(SyncCounters.MAPPERS_VERIFIED).increment(1);
-          syncTableOutputRepository
-            .checkpointSyncTableResult(new 
PhoenixSyncTableCheckpointOutputRow.Builder()
-              .setTableName(tableName).setTargetCluster(targetZkQuorum)
-              
.setType(PhoenixSyncTableCheckpointOutputRow.Type.REGION).setFromTime(fromTime)
-              .setToTime(toTime).setTenantId(tenantId).setIsDryRun(isDryRun)
-              .setStartRowKey(mapperRegionStart).setEndRowKey(mapperRegionEnd)
-              .setStatus(PhoenixSyncTableCheckpointOutputRow.Status.VERIFIED)
-              
.setExecutionStartTime(mapperStartTime).setExecutionEndTime(mapperEndTime)
-              .setCounters(counters).build());
-          LOGGER.info(
-            "PhoenixSyncTable mapper completed with verified: {} verified 
chunks, {} mismatched chunks",
-            chunksVerified, chunksMismatched);
-        } else {
-          context.getCounter(SyncCounters.MAPPERS_MISMATCHED).increment(1);
-          LOGGER.warn(
-            "PhoenixSyncTable mapper completed with mismatch: {} verified 
chunks, {} mismatched chunks",
-            chunksVerified, chunksMismatched);
-          syncTableOutputRepository
-            .checkpointSyncTableResult(new 
PhoenixSyncTableCheckpointOutputRow.Builder()
-              .setTableName(tableName).setTargetCluster(targetZkQuorum)
-              
.setType(PhoenixSyncTableCheckpointOutputRow.Type.REGION).setFromTime(fromTime)
-              .setToTime(toTime).setTenantId(tenantId).setIsDryRun(isDryRun)
-              .setStartRowKey(mapperRegionStart).setEndRowKey(mapperRegionEnd)
-              .setStatus(PhoenixSyncTableCheckpointOutputRow.Status.MISMATCHED)
-              
.setExecutionStartTime(mapperStartTime).setExecutionEndTime(mapperEndTime)
-              .setCounters(counters).build());
-        }
-      } else {
-        LOGGER.info(
-          "No rows pending to process. All mapper region boundaries are 
covered for startKey:{}, endKey: {}",
-          mapperRegionStart, mapperRegionEnd);
+      // Process each region in the split (one or multiple for coalesced 
splits)
+      for (KeyRange keyRange : regionKeyRanges) {
+        byte[] regionStart = keyRange.getLowerRange();
+        byte[] regionEnd = keyRange.getUpperRange();
+        LOGGER.info("Processing region [{}, {}) from split for table {}",
+          Bytes.toStringBinary(regionStart), Bytes.toStringBinary(regionEnd), 
tableName);
+        processRegion(regionStart, regionEnd, context);
       }
     } catch (SQLException e) {
       tryClosingResources();
@@ -224,6 +182,124 @@ public class PhoenixSyncTableMapper
     }
   }
 
+  /**
+   * Processes a single region within a split (could be part of a coalesced 
split).
+   * @param regionStart Start key of the region
+   * @param regionEnd   End key of the region
+   * @param context     Mapper context
+   */
+  private void processRegion(byte[] regionStart, byte[] regionEnd, Context 
context)
+    throws SQLException, IOException, InterruptedException {
+
+    Timestamp regionStartTime = new Timestamp(System.currentTimeMillis());
+
+    // Get processed chunks for this specific region
+    List<PhoenixSyncTableCheckpointOutputRow> processedChunks =
+      syncTableOutputRepository.getProcessedChunks(tableName, targetZkQuorum, 
fromTime, toTime,
+        tenantId, regionStart, regionEnd);
+
+    // Calculate unprocessed ranges within this region
+    List<KeyRange> unprocessedRanges =
+      calculateUnprocessedRanges(regionStart, regionEnd, processedChunks);
+
+    // Track counters before processing this region
+    long verifiedBefore = 
context.getCounter(SyncCounters.CHUNKS_VERIFIED).getValue();
+    long mismatchedBefore = 
context.getCounter(SyncCounters.CHUNKS_MISMATCHED).getValue();
+    long sourceRowsBefore = 
context.getCounter(SyncCounters.SOURCE_ROWS_PROCESSED).getValue();
+    long targetRowsBefore = 
context.getCounter(SyncCounters.TARGET_ROWS_PROCESSED).getValue();
+
+    // Process all unprocessed ranges in this region
+    boolean isStartKeyInclusive = shouldStartKeyBeInclusive(regionStart, 
processedChunks);
+    for (KeyRange range : unprocessedRanges) {
+      processMapperRanges(range.getLowerRange(), range.getUpperRange(), 
isStartKeyInclusive,
+        context);
+      isStartKeyInclusive = false;
+    }
+
+    // Calculate counters for this region only
+    long verifiedChunks =
+      context.getCounter(SyncCounters.CHUNKS_VERIFIED).getValue() - 
verifiedBefore;
+    long mismatchedChunks =
+      context.getCounter(SyncCounters.CHUNKS_MISMATCHED).getValue() - 
mismatchedBefore;
+    long sourceRowsProcessed =
+      context.getCounter(SyncCounters.SOURCE_ROWS_PROCESSED).getValue() - 
sourceRowsBefore;
+    long targetRowsProcessed =
+      context.getCounter(SyncCounters.TARGET_ROWS_PROCESSED).getValue() - 
targetRowsBefore;
+
+    Timestamp regionEndTime = new Timestamp(System.currentTimeMillis());
+    String counters = PhoenixSyncTableCheckpointOutputRow.CounterFormatter
+      .formatMapper(verifiedChunks, mismatchedChunks, sourceRowsProcessed, 
targetRowsProcessed);
+    if (sourceRowsProcessed > 0) {
+      recordRegionCompletion(regionStart, regionEnd, regionStartTime, 
regionEndTime, verifiedChunks,
+        mismatchedChunks, counters, context);
+    } else {
+      LOGGER.info(
+        "No rows pending to process. All region boundaries are covered for 
startKey:{}, endKey: {}",
+        Bytes.toStringBinary(regionStart), Bytes.toStringBinary(regionEnd));
+    }
+  }
+
+  /**
+   * Records region completion by updating counters, recording checkpoint, and 
logging result.
+   * Consolidates all region completion logic to eliminate duplication.
+   * @param regionStart      Region start key
+   * @param regionEnd        Region end key
+   * @param regionStartTime  Region processing start time
+   * @param regionEndTime    Region processing end time
+   * @param verifiedChunks   Number of verified chunks
+   * @param mismatchedChunks Number of mismatched chunks
+   * @param counters         Formatted counter string
+   * @param context          Mapper context
+   */
+  private void recordRegionCompletion(byte[] regionStart, byte[] regionEnd,
+    Timestamp regionStartTime, Timestamp regionEndTime, long verifiedChunks, 
long mismatchedChunks,
+    String counters, Context context) throws SQLException {
+
+    boolean isVerified = mismatchedChunks == 0;
+    PhoenixSyncTableCheckpointOutputRow.Status status = isVerified
+      ? PhoenixSyncTableCheckpointOutputRow.Status.VERIFIED
+      : PhoenixSyncTableCheckpointOutputRow.Status.MISMATCHED;
+
+    context.getCounter(isVerified ? SyncCounters.MAPPERS_VERIFIED : 
SyncCounters.MAPPERS_MISMATCHED)
+      .increment(1);
+
+    recordRegionCheckpoint(regionStart, regionEnd, status, regionStartTime, 
regionEndTime,
+      counters);
+
+    String logMessage = String.format(
+      "PhoenixSyncTable region [%s, %s) completed with %s: %d verified chunks, 
%d mismatched chunks",
+      Bytes.toStringBinary(regionStart), Bytes.toStringBinary(regionEnd),
+      isVerified ? "verified" : "mismatch", verifiedChunks, mismatchedChunks);
+
+    if (isVerified) {
+      LOGGER.info(logMessage);
+    } else {
+      LOGGER.warn(logMessage);
+    }
+  }
+
+  /**
+   * Records a region checkpoint to the checkpoint table.
+   * @param regionStart     Region start key
+   * @param regionEnd       Region end key
+   * @param status          Status (VERIFIED or MISMATCHED)
+   * @param regionStartTime Region processing start time
+   * @param regionEndTime   Region processing end time
+   * @param counters        Formatted counter string
+   */
+  private void recordRegionCheckpoint(byte[] regionStart, byte[] regionEnd,
+    PhoenixSyncTableCheckpointOutputRow.Status status, Timestamp 
regionStartTime,
+    Timestamp regionEndTime, String counters) throws SQLException {
+
+    syncTableOutputRepository
+      .checkpointSyncTableResult(new 
PhoenixSyncTableCheckpointOutputRow.Builder()
+        .setTableName(tableName).setTargetCluster(targetZkQuorum)
+        
.setType(PhoenixSyncTableCheckpointOutputRow.Type.REGION).setFromTime(fromTime)
+        
.setToTime(toTime).setTenantId(tenantId).setIsDryRun(isDryRun).setStartRowKey(regionStart)
+        
.setEndRowKey(regionEnd).setStatus(status).setExecutionStartTime(regionStartTime)
+        .setExecutionEndTime(regionEndTime).setCounters(counters).build());
+  }
+
   /**
    * Processes a chunk range by comparing source and target cluster data. 
Source chunking: Breaks
    * data into size-based chunks within given mapper region boundary. Target 
chunking: Follows
@@ -408,13 +484,7 @@ public class PhoenixSyncTableMapper
       
scan.setAttribute(BaseScannerRegionObserverConstants.SYNC_TABLE_CHUNK_SIZE_BYTES,
         Bytes.toBytes(chunkSizeBytes));
     }
-    // Use the half of the HBase RPC timeout value as the server page size to 
make sure
-    // that the HBase region server will be able to send a heartbeat message 
to the
-    // client before the client times out.
-    long syncTablePageTimeoutMs = (long) 
(conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY,
-      QueryServicesOptions.DEFAULT_SYNC_TABLE_RPC_TIMEOUT) * 0.5);
-    scan.setAttribute(BaseScannerRegionObserverConstants.SERVER_PAGE_SIZE_MS,
-      Bytes.toBytes(syncTablePageTimeoutMs));
+    ScanUtil.setScanAttributeForPaging(scan, phoenixConn);
     ResultScanner scanner = hTable.getScanner(scan);
     return new ChunkScannerContext(hTable, scanner);
   }
@@ -453,14 +523,8 @@ public class PhoenixSyncTableMapper
 
   private void handleVerifiedChunk(ChunkInfo sourceChunk, Context context, 
String counters)
     throws SQLException {
-    syncTableOutputRepository.checkpointSyncTableResult(
-      new PhoenixSyncTableCheckpointOutputRow.Builder().setTableName(tableName)
-        
.setTargetCluster(targetZkQuorum).setType(PhoenixSyncTableCheckpointOutputRow.Type.CHUNK)
-        
.setFromTime(fromTime).setToTime(toTime).setTenantId(tenantId).setIsDryRun(isDryRun)
-        .setStartRowKey(sourceChunk.startKey).setEndRowKey(sourceChunk.endKey)
-        .setStatus(PhoenixSyncTableCheckpointOutputRow.Status.VERIFIED)
-        .setExecutionStartTime(sourceChunk.executionStartTime)
-        
.setExecutionEndTime(sourceChunk.executionEndTime).setCounters(counters).build());
+    recordChunkCheckpoint(sourceChunk, 
PhoenixSyncTableCheckpointOutputRow.Status.VERIFIED,
+      counters);
     context.getCounter(SyncCounters.CHUNKS_VERIFIED).increment(1);
   }
 
@@ -468,16 +532,27 @@ public class PhoenixSyncTableMapper
     throws SQLException {
     LOGGER.warn("Chunk mismatch detected for table: {}, with startKey: {}, 
endKey {}", tableName,
       Bytes.toStringBinary(sourceChunk.startKey), 
Bytes.toStringBinary(sourceChunk.endKey));
+    recordChunkCheckpoint(sourceChunk, 
PhoenixSyncTableCheckpointOutputRow.Status.MISMATCHED,
+      counters);
+    context.getCounter(SyncCounters.CHUNKS_MISMATCHED).increment(1);
+  }
+
+  /**
+   * Records a chunk checkpoint to the checkpoint table.
+   * @param sourceChunk Chunk information
+   * @param status      Status (VERIFIED or MISMATCHED)
+   * @param counters    Formatted counter string
+   */
+  private void recordChunkCheckpoint(ChunkInfo sourceChunk,
+    PhoenixSyncTableCheckpointOutputRow.Status status, String counters) throws 
SQLException {
+
     syncTableOutputRepository.checkpointSyncTableResult(
       new PhoenixSyncTableCheckpointOutputRow.Builder().setTableName(tableName)
         
.setTargetCluster(targetZkQuorum).setType(PhoenixSyncTableCheckpointOutputRow.Type.CHUNK)
         
.setFromTime(fromTime).setToTime(toTime).setTenantId(tenantId).setIsDryRun(isDryRun)
-        .setStartRowKey(sourceChunk.startKey).setEndRowKey(sourceChunk.endKey)
-        .setStatus(PhoenixSyncTableCheckpointOutputRow.Status.MISMATCHED)
+        
.setStartRowKey(sourceChunk.startKey).setEndRowKey(sourceChunk.endKey).setStatus(status)
         .setExecutionStartTime(sourceChunk.executionStartTime)
         
.setExecutionEndTime(sourceChunk.executionEndTime).setCounters(counters).build());
-
-    context.getCounter(SyncCounters.CHUNKS_MISMATCHED).increment(1);
   }
 
   /**
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableTool.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableTool.java
index 90963c37a3..80eacde25e 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableTool.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableTool.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
@@ -115,6 +116,8 @@ public class PhoenixSyncTableTool extends Configured 
implements Tool {
   private static final Option READ_ALL_VERSIONS_OPTION = new Option("rav", 
"read-all-versions",
     false,
     "Enable reading all cell versions (optional, disabled by default, reads 
only latest version)");
+  private static final Option COALESCE_SPLIT_OPTION = new Option("coal", 
"coalesce-split", false,
+    "Enable split coalescing to reduce mapper count (optional, disabled by 
default)");
   private static final Option HELP_OPTION = new Option("h", "help", false, 
"Help");
 
   public static final String PHOENIX_SYNC_TABLE_NAME = 
"phoenix.sync.table.table.name";
@@ -124,7 +127,10 @@ public class PhoenixSyncTableTool extends Configured 
implements Tool {
   public static final String PHOENIX_SYNC_TABLE_DRY_RUN = 
"phoenix.sync.table.dry.run";
   public static final String PHOENIX_SYNC_TABLE_CHUNK_SIZE_BYTES =
     "phoenix.sync.table.chunk.size.bytes";
-  public static final long DEFAULT_PHOENIX_SYNC_TABLE_CHUNK_SIZE_BYTES = 1024 
* 1024 * 1024; // 1GB
+  public static final long DEFAULT_PHOENIX_SYNC_TABLE_CHUNK_SIZE_BYTES = 1024 
* 1024 * 1024; // 1 GB
+  public static final String PHOENIX_SYNC_TABLE_SPLIT_COALESCING =
+    "phoenix.sync.table.split.coalescing";
+  public static final boolean DEFAULT_PHOENIX_SYNC_TABLE_SPLIT_COALESCING = 
false;
   public static final String PHOENIX_SYNC_TABLE_RAW_SCAN = 
"phoenix.sync.table.raw.scan";
   public static final String PHOENIX_SYNC_TABLE_READ_ALL_VERSIONS =
     "phoenix.sync.table.read.all.versions";
@@ -140,6 +146,7 @@ public class PhoenixSyncTableTool extends Configured 
implements Tool {
   private String tenantId;
   private boolean isRawScan = false;
   private boolean isReadAllVersions = false;
+  private boolean isCoalesceSplit = false;
 
   private String qTable;
   private String qSchemaName;
@@ -218,6 +225,7 @@ public class PhoenixSyncTableTool extends Configured 
implements Tool {
     setPhoenixSyncTableDryRun(configuration, isDryRun);
     setPhoenixSyncTableRawScan(configuration, isRawScan);
     setPhoenixSyncTableReadAllVersions(configuration, isReadAllVersions);
+    setPhoenixSyncTableSplitCoalescing(configuration, isCoalesceSplit);
     PhoenixConfigurationUtil.setSplitByStats(configuration, false);
     if (chunkSizeBytes != null) {
       setPhoenixSyncTableChunkSizeBytes(configuration, chunkSizeBytes);
@@ -292,6 +300,7 @@ public class PhoenixSyncTableTool extends Configured 
implements Tool {
     options.addOption(TENANT_ID_OPTION);
     options.addOption(RAW_SCAN_OPTION);
     options.addOption(READ_ALL_VERSIONS_OPTION);
+    options.addOption(COALESCE_SPLIT_OPTION);
     options.addOption(HELP_OPTION);
     return options;
   }
@@ -307,7 +316,7 @@ public class PhoenixSyncTableTool extends Configured 
implements Tool {
     formatter.printHelp(cmdLineSyntax,
       "Synchronize a Phoenix table between source and target clusters", 
options,
       "\nExample:\n" + cmdLineSyntax + " \\\n" + "  --table-name MY_TABLE \\\n"
-        + "  --target-cluster <zk_quorum>:2181 \\\n" + "  --dry-run\n",
+        + "  --target-cluster <zk_quorum>:2181 \\\n" + "  --dry-run \\\n" + "  
--coalesce-split\n",
       true);
     System.exit(exitCode);
   }
@@ -344,15 +353,16 @@ public class PhoenixSyncTableTool extends Configured 
implements Tool {
     isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
     isRawScan = cmdLine.hasOption(RAW_SCAN_OPTION.getOpt());
     isReadAllVersions = cmdLine.hasOption(READ_ALL_VERSIONS_OPTION.getOpt());
+    isCoalesceSplit = cmdLine.hasOption(COALESCE_SPLIT_OPTION.getOpt());
     qTable = SchemaUtil.getQualifiedTableName(schemaName, tableName);
     qSchemaName = SchemaUtil.normalizeIdentifier(schemaName);
     PhoenixMapReduceUtil.validateTimeRange(startTime, endTime, qTable);
     LOGGER.info(
       "PhoenixSyncTableTool configured - Table: {}, Schema: {}, Target: {}, "
         + "StartTime: {}, EndTime: {}, DryRun: {}, ChunkSize: {}, Foreground: 
{}, TenantId: {}, "
-        + "RawScan: {}, ReadAllVersions: {}",
+        + "RawScan: {}, ReadAllVersions: {}, CoalesceSplit: {}",
       qTable, qSchemaName, targetZkQuorum, startTime, endTime, isDryRun, 
chunkSizeBytes,
-      isForeground, tenantId, isRawScan, isReadAllVersions);
+      isForeground, tenantId, isRawScan, isReadAllVersions, isCoalesceSplit);
   }
 
   /**
@@ -396,6 +406,7 @@ public class PhoenixSyncTableTool extends Configured 
implements Tool {
     }
     Counters counters = job.getCounters();
     if (counters != null) {
+      long taskCreated = 
counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS).getValue();
       long verifiedMappers =
         
counters.findCounter(PhoenixSyncTableMapper.SyncCounters.MAPPERS_VERIFIED).getValue();
       long mismatchedMappers =
@@ -409,11 +420,11 @@ public class PhoenixSyncTableTool extends Configured 
implements Tool {
       long targetRowsProcessed =
         
counters.findCounter(PhoenixSyncTableMapper.SyncCounters.TARGET_ROWS_PROCESSED).getValue();
       LOGGER.info(
-        "PhoenixSyncTable job completed, gathered counters are \n Verified 
Mappers: {}, \n"
+        "PhoenixSyncTable job completed, gathered counters are \n Task 
Created: {}, \n Verified Mappers: {}, \n"
           + "Mismatched Mappers: {}, \n Chunks Verified: {}, \n"
           + "Chunks Mismatched: {}, \n Source Rows Processed: {}, \n Target 
Rows Processed: {}",
-        verifiedMappers, mismatchedMappers, chunksVerified, chunksMismatched, 
sourceRowsProcessed,
-        targetRowsProcessed);
+        taskCreated, verifiedMappers, mismatchedMappers, chunksVerified, 
chunksMismatched,
+        sourceRowsProcessed, targetRowsProcessed);
     } else {
       LOGGER.warn("Unable to retrieve job counters for table {} - job may have 
failed "
         + "during initialization", qTable);
@@ -535,6 +546,18 @@ public class PhoenixSyncTableTool extends Configured 
implements Tool {
     return conf.getBoolean(PHOENIX_SYNC_TABLE_READ_ALL_VERSIONS, false);
   }
 
+  public static void setPhoenixSyncTableSplitCoalescing(Configuration conf,
+    boolean splitCoalescing) {
+    Preconditions.checkNotNull(conf);
+    conf.setBoolean(PHOENIX_SYNC_TABLE_SPLIT_COALESCING, splitCoalescing);
+  }
+
+  public static boolean getPhoenixSyncTableSplitCoalescing(Configuration conf) 
{
+    Preconditions.checkNotNull(conf);
+    return conf.getBoolean(PHOENIX_SYNC_TABLE_SPLIT_COALESCING,
+      DEFAULT_PHOENIX_SYNC_TABLE_SPLIT_COALESCING);
+  }
+
   public Job getJob() {
     return job;
   }
diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml
index 2bb3d74bf9..10ece383a9 100644
--- a/phoenix-core/pom.xml
+++ b/phoenix-core/pom.xml
@@ -392,7 +392,6 @@
     <dependency>
       <groupId>org.bouncycastle</groupId>
       <artifactId>bcprov-jdk18on</artifactId>
-      <version>1.79</version>
       <scope>test</scope>
     </dependency>
     <!-- logging end -->
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixSyncTableToolIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixSyncTableToolIT.java
index fc277131ab..0661973dbe 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixSyncTableToolIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixSyncTableToolIT.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskCounter;
 import 
org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDriver;
@@ -159,6 +160,7 @@ public class PhoenixSyncTableToolIT {
 
     validateSyncCounters(counters, 10, 10, 1, 3);
     validateMapperCounters(counters, 1, 3);
+    assertEquals("Expected 4 mapper task to be created", 4, 
counters.taskCreated);
 
     List<PhoenixSyncTableCheckpointOutputRow> checkpointEntries =
       queryCheckpointTable(sourceConnection, uniqueTableName, targetZkQuorum, 
null);
@@ -205,6 +207,8 @@ public class PhoenixSyncTableToolIT {
 
     validateSyncCounters(counters, 10, 7, 7, 3);
     validateMapperCounters(counters, 1, 3);
+    assertEquals("Should have only 1 Mapper task created with coalescing", 4, 
counters.taskCreated);
+
   }
 
   @Test
@@ -1244,10 +1248,8 @@ public class PhoenixSyncTableToolIT {
 
     // Configure paging with aggressive timeouts to force mid-chunk timeouts
     Configuration conf = new 
Configuration(CLUSTERS.getHBaseCluster1().getConfiguration());
-
-    long aggressiveRpcTimeout = 2L;
-    conf.setLong(QueryServices.SYNC_TABLE_RPC_TIMEOUT_ATTRIB, 
aggressiveRpcTimeout);
-    conf.setLong(HConstants.HBASE_RPC_TIMEOUT_KEY, aggressiveRpcTimeout);
+    conf.setBoolean(QueryServices.PHOENIX_SERVER_PAGING_ENABLED_ATTRIB, true);
+    conf.setLong(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, 1);
 
     int chunkSize = 10240;
 
@@ -1307,15 +1309,10 @@ public class PhoenixSyncTableToolIT {
 
     // Configure paging with aggressive timeouts to force mid-chunk timeouts
     Configuration conf = new 
Configuration(CLUSTERS.getHBaseCluster1().getConfiguration());
-
-    // Enable server-side paging
     conf.setBoolean(QueryServices.PHOENIX_SERVER_PAGING_ENABLED_ATTRIB, true);
-    // Set extremely short rpc timeout to force frequent paging
-    long aggressiveRpcTimeout = 1L; // 1ms RPC timeout
-    conf.setLong(QueryServices.SYNC_TABLE_RPC_TIMEOUT_ATTRIB, 
aggressiveRpcTimeout);
-    conf.setLong(HConstants.HBASE_RPC_TIMEOUT_KEY, aggressiveRpcTimeout);
+    conf.setLong(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, 1);
 
-    int chunkSize = 102400; // 100KB
+    int chunkSize = 10240;
 
     // Create a thread that will perform splits on source cluster during sync
     Thread sourceSplitThread = new Thread(() -> {
@@ -1751,6 +1748,29 @@ public class PhoenixSyncTableToolIT {
     validateMapperCounters(counters3, 3, 1);
   }
 
+  @Test
+  public void testSyncTableValidateWithSplitCoalescing() throws Exception {
+    setupStandardTestWithReplication(uniqueTableName, 1, 10);
+
+    introduceAndVerifyTargetDifferences(uniqueTableName);
+
+    // Enable split coalescing via command-line parameter, all regions will be 
coalesced into one
+    // mapper
+    Job job = runSyncTool(uniqueTableName, "--coalesce-split");
+    SyncCountersResult counters = getSyncCounters(job);
+
+    assertEquals("Should have only 1 Mapper task created with coalescing", 1, 
counters.taskCreated);
+
+    validateSyncCounters(counters, 10, 10, 7, 3);
+    validateMapperCounters(counters, 1, 3);
+
+    // Verify checkpoint entries are created correctly
+    List<PhoenixSyncTableCheckpointOutputRow> checkpointEntries =
+      queryCheckpointTable(sourceConnection, uniqueTableName, targetZkQuorum, 
null);
+    validateCheckpointEntries(checkpointEntries, uniqueTableName, 
targetZkQuorum, 10, 10, 7, 3, 4,
+      3, null);
+  }
+
   /**
    * Helper class to hold separated mapper and chunk entries.
    */
@@ -2327,6 +2347,7 @@ public class PhoenixSyncTableToolIT {
       LOGGER.info("Split completed for table {} at split point {} (bytes: 
{})", tableName, splitId,
         Bytes.toStringBinary(splitPoint));
     } catch (Exception e) {
+      // Ignore split failures - they don't affect the test's main goal
       LOGGER.warn("Failed to split table {} at split point {}: {}", tableName, 
splitId,
         e.getMessage());
     }
@@ -2576,6 +2597,7 @@ public class PhoenixSyncTableToolIT {
     public final long chunksVerified;
     public final long mappersVerified;
     public final long mappersMismatched;
+    public final long taskCreated;
 
     SyncCountersResult(Counters counters) {
       this.sourceRowsProcessed =
@@ -2586,6 +2608,7 @@ public class PhoenixSyncTableToolIT {
       this.chunksVerified = 
counters.findCounter(SyncCounters.CHUNKS_VERIFIED).getValue();
       this.mappersVerified = 
counters.findCounter(SyncCounters.MAPPERS_VERIFIED).getValue();
       this.mappersMismatched = 
counters.findCounter(SyncCounters.MAPPERS_MISMATCHED).getValue();
+      this.taskCreated = 
counters.findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue();
     }
 
     public void logCounters(String testName) {
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixInputSplitTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixInputSplitTest.java
new file mode 100644
index 0000000000..d780e25b87
--- /dev/null
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixInputSplitTest.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.query.KeyRange;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class PhoenixInputSplitTest {
+
+  /**
+   * Helper method to create a Scan with given row boundaries.
+   */
+  private Scan createScan(byte[] startRow, byte[] stopRow) {
+    Scan scan = new Scan();
+    scan.withStartRow(startRow, true);
+    scan.withStopRow(stopRow, false);
+    return scan;
+  }
+
+  /**
+   * Helper method to serialize and deserialize a PhoenixInputSplit.
+   */
+  private PhoenixInputSplit serializeAndDeserialize(PhoenixInputSplit split) 
throws IOException {
+    // Serialize
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutput out = new DataOutputStream(baos);
+    split.write(out);
+
+    // Deserialize
+    ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+    DataInput in = new DataInputStream(bais);
+    PhoenixInputSplit deserialized = new PhoenixInputSplit();
+    deserialized.readFields(in);
+
+    return deserialized;
+  }
+
+  @Test
+  public void testStandardConstructorWithSingleScan() {
+    List<Scan> scans = new ArrayList<>();
+    scans.add(createScan(Bytes.toBytes("a"), Bytes.toBytes("d")));
+
+    PhoenixInputSplit split = new PhoenixInputSplit(scans);
+
+    assertFalse("Should not be coalesced with single scan", 
split.isCoalesced());
+    assertEquals("Should have 1 keyRange", 1, split.getKeyRanges().size());
+    assertNotNull("KeyRange should be initialized", split.getKeyRange());
+    assertTrue("KeyRange should span scan boundaries",
+      Bytes.equals(Bytes.toBytes("a"), split.getKeyRange().getLowerRange()));
+    assertTrue("KeyRange should span scan boundaries",
+      Bytes.equals(Bytes.toBytes("d"), split.getKeyRange().getUpperRange()));
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testStandardConstructorWithNullScans() {
+    new PhoenixInputSplit(null);
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testStandardConstructorWithEmptyScans() {
+    new PhoenixInputSplit(Collections.emptyList());
+  }
+
+  @Test
+  public void testCoalescingConstructorWithMultipleRegions()
+    throws IOException, InterruptedException {
+    List<Scan> scans = new ArrayList<>();
+    scans.add(createScan(Bytes.toBytes("a"), Bytes.toBytes("d")));
+    scans.add(createScan(Bytes.toBytes("d"), Bytes.toBytes("g")));
+    scans.add(createScan(Bytes.toBytes("g"), Bytes.toBytes("j")));
+
+    long splitSize = 3072;
+    String regionLocation = "server1:16020";
+
+    // KeyRanges are now automatically derived from scans
+    PhoenixInputSplit split = new PhoenixInputSplit(scans, splitSize, 
regionLocation);
+
+    assertTrue("Should be coalesced with multiple regions", 
split.isCoalesced());
+    assertEquals("Should have 3 keyRanges (derived from scans)", 3, 
split.getKeyRanges().size());
+    assertEquals("Split size should match", splitSize, split.getLength());
+    assertArrayEquals("Region location should match", new String[] { 
regionLocation },
+      split.getLocations());
+
+    // Verify keyRanges were derived correctly from scans
+    List<KeyRange> keyRanges = split.getKeyRanges();
+    assertTrue("First keyRange should match first scan",
+      Bytes.equals(Bytes.toBytes("a"), keyRanges.get(0).getLowerRange()));
+    assertTrue("First keyRange should match first scan",
+      Bytes.equals(Bytes.toBytes("d"), keyRanges.get(0).getUpperRange()));
+    assertTrue("Second keyRange should match second scan",
+      Bytes.equals(Bytes.toBytes("d"), keyRanges.get(1).getLowerRange()));
+    assertTrue("Second keyRange should match second scan",
+      Bytes.equals(Bytes.toBytes("g"), keyRanges.get(1).getUpperRange()));
+    assertTrue("Third keyRange should match third scan",
+      Bytes.equals(Bytes.toBytes("g"), keyRanges.get(2).getLowerRange()));
+    assertTrue("Third keyRange should match third scan",
+      Bytes.equals(Bytes.toBytes("j"), keyRanges.get(2).getUpperRange()));
+  }
+
+  @Test
+  public void testSerializationWithSingleRegion() throws IOException, 
InterruptedException {
+    List<Scan> scans = new ArrayList<>();
+    scans.add(createScan(Bytes.toBytes("a"), Bytes.toBytes("d")));
+
+    PhoenixInputSplit original = new PhoenixInputSplit(scans, 1024, 
"server1:16020");
+
+    PhoenixInputSplit deserialized = serializeAndDeserialize(original);
+
+    assertFalse("Should not be coalesced after deserialization", 
deserialized.isCoalesced());
+    assertEquals("Should have 1 keyRange", 1, 
deserialized.getKeyRanges().size());
+    assertEquals("Split size should match", original.getLength(), 
deserialized.getLength());
+    assertArrayEquals("Region location should match", original.getLocations(),
+      deserialized.getLocations());
+    assertTrue("KeyRange should match", 
Bytes.equals(original.getKeyRange().getLowerRange(),
+      deserialized.getKeyRange().getLowerRange()));
+    assertTrue("KeyRange should match", 
Bytes.equals(original.getKeyRange().getUpperRange(),
+      deserialized.getKeyRange().getUpperRange()));
+  }
+
+  @Test
+  public void testSerializationWithCoalescedSplit() throws IOException, 
InterruptedException {
+    List<Scan> scans = new ArrayList<>();
+    scans.add(createScan(Bytes.toBytes("a"), Bytes.toBytes("d")));
+    scans.add(createScan(Bytes.toBytes("d"), Bytes.toBytes("g")));
+    scans.add(createScan(Bytes.toBytes("g"), Bytes.toBytes("j")));
+
+    // KeyRanges are now automatically derived from scans
+    PhoenixInputSplit original = new PhoenixInputSplit(scans, 3072, 
"server1:16020");
+
+    PhoenixInputSplit deserialized = serializeAndDeserialize(original);
+
+    assertTrue("Should be coalesced after deserialization", 
deserialized.isCoalesced());
+    assertEquals("Should have 3 keyRanges", 3, 
deserialized.getKeyRanges().size());
+    assertEquals("Split size should match", original.getLength(), 
deserialized.getLength());
+    assertArrayEquals("Region location should match", original.getLocations(),
+      deserialized.getLocations());
+
+    // Verify all keyRanges are preserved (derived from scans)
+    List<KeyRange> originalKeyRanges = original.getKeyRanges();
+    List<KeyRange> deserializedKeyRanges = deserialized.getKeyRanges();
+    for (int i = 0; i < originalKeyRanges.size(); i++) {
+      assertTrue("KeyRange " + i + " should match", Bytes.equals(
+        originalKeyRanges.get(i).getLowerRange(), 
deserializedKeyRanges.get(i).getLowerRange()));
+      assertTrue("KeyRange " + i + " should match", Bytes.equals(
+        originalKeyRanges.get(i).getUpperRange(), 
deserializedKeyRanges.get(i).getUpperRange()));
+    }
+  }
+
+  @Test
+  public void testGetLocations() throws IOException, InterruptedException {
+    List<Scan> scans =
+      Collections.singletonList(createScan(Bytes.toBytes("a"), 
Bytes.toBytes("d")));
+
+    // Test with null regionLocation
+    PhoenixInputSplit split1 = new PhoenixInputSplit(scans, 1024, null);
+    assertArrayEquals("Should return empty array for null location", new 
String[] {},
+      split1.getLocations());
+
+    // Test with valid regionLocation
+    PhoenixInputSplit split2 = new PhoenixInputSplit(scans, 1024, 
"server1:16020");
+    assertArrayEquals("Should return array with server location", new String[] 
{ "server1:16020" },
+      split2.getLocations());
+  }
+}
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixSyncTableInputFormatTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixSyncTableInputFormatTest.java
index 15e643feaf..95adb365a0 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixSyncTableInputFormatTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixSyncTableInputFormatTest.java
@@ -18,27 +18,46 @@
 package org.apache.phoenix.mapreduce;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
+import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.query.KeyRange;
+import org.junit.Before;
 import org.junit.Test;
 
 /**
  * Unit tests for PhoenixSyncTableInputFormat. Tests various scenarios of 
filtering completed splits
+ * and split coalescing functionality.
  */
 public class PhoenixSyncTableInputFormatTest {
 
   private PhoenixSyncTableInputFormat inputFormat = new 
PhoenixSyncTableInputFormat();
 
+  private ConnectionQueryServices mockQueryServices;
+  private byte[] physicalTableName = Bytes.toBytes("TEST_TABLE");
+
+  @Before
+  public void setup() throws Exception {
+    mockQueryServices = mock(ConnectionQueryServices.class);
+  }
+
   /**
    * Helper method to create a PhoenixInputSplit with given key range 
boundaries.
    */
@@ -280,4 +299,289 @@ public class PhoenixSyncTableInputFormatTest {
     assertTrue("Should return a PhoenixNoOpSingleRecordReader",
       reader instanceof PhoenixNoOpSingleRecordReader);
   }
+
+  @Test
+  public void testCoalesceSplitsWithSingleServer() throws Exception {
+    // Create 3 PhoenixInputSplits all on same server
+    List<InputSplit> splits = new ArrayList<>();
+    splits.add(createSplit(Bytes.toBytes("a"), Bytes.toBytes("d")));
+    splits.add(createSplit(Bytes.toBytes("d"), Bytes.toBytes("g")));
+    splits.add(createSplit(Bytes.toBytes("g"), Bytes.toBytes("j")));
+
+    // Create mock region location - all splits on server1
+    HRegionLocation mockRegion = createMockRegionLocation("server1:16020", 
Bytes.toBytes("a"));
+
+    // Mock ConnectionQueryServices: all splits → server1
+    when(mockQueryServices.getTableRegionLocation(any(byte[].class), 
any(byte[].class)))
+      .thenReturn(mockRegion);
+
+    // Call coalesceSplits()
+    List<InputSplit> result =
+      inputFormat.coalesceSplits(splits, mockQueryServices, physicalTableName);
+
+    // Verify: 1 coalesced split (all on same server)
+    assertEquals("Should have 1 coalesced split (all on same server)", 1, 
result.size());
+
+    // Verify: Split is coalesced and contains 3 KeyRanges
+    PhoenixInputSplit coalescedSplit = (PhoenixInputSplit) result.get(0);
+
+    assertTrue("Split should be coalesced", coalescedSplit.isCoalesced());
+    assertEquals("Split should have 3 KeyRanges", 3, 
coalescedSplit.getKeyRanges().size());
+
+    // Verify: KeyRanges are sorted
+    List<KeyRange> keyRanges = coalescedSplit.getKeyRanges();
+    assertTrue("First KeyRange should start with 'a'",
+      Bytes.equals(Bytes.toBytes("a"), keyRanges.get(0).getLowerRange()));
+    assertTrue("Second KeyRange should start with 'd'",
+      Bytes.equals(Bytes.toBytes("d"), keyRanges.get(1).getLowerRange()));
+    assertTrue("Third KeyRange should start with 'g'",
+      Bytes.equals(Bytes.toBytes("g"), keyRanges.get(2).getLowerRange()));
+  }
+
+  @Test
+  public void testCoalesceSplitsWithMultipleServers() throws Exception {
+    // Create 6 PhoenixInputSplits
+    List<InputSplit> splits = new ArrayList<>();
+    splits.add(createSplit(Bytes.toBytes("a"), Bytes.toBytes("c")));
+    splits.add(createSplit(Bytes.toBytes("c"), Bytes.toBytes("e")));
+    splits.add(createSplit(Bytes.toBytes("e"), Bytes.toBytes("g")));
+    splits.add(createSplit(Bytes.toBytes("g"), Bytes.toBytes("i")));
+    splits.add(createSplit(Bytes.toBytes("i"), Bytes.toBytes("k")));
+    splits.add(createSplit(Bytes.toBytes("k"), Bytes.toBytes("m")));
+
+    // Create mock region locations BEFORE stubbing to avoid nested stubbing 
issues
+    HRegionLocation mockRegionA = createMockRegionLocation("server1:16020", 
Bytes.toBytes("a"));
+    HRegionLocation mockRegionC = createMockRegionLocation("server1:16020", 
Bytes.toBytes("c"));
+    HRegionLocation mockRegionE = createMockRegionLocation("server1:16020", 
Bytes.toBytes("e"));
+    HRegionLocation mockRegionG = createMockRegionLocation("server2:16020", 
Bytes.toBytes("g"));
+    HRegionLocation mockRegionI = createMockRegionLocation("server2:16020", 
Bytes.toBytes("i"));
+    HRegionLocation mockRegionK = createMockRegionLocation("server2:16020", 
Bytes.toBytes("k"));
+
+    // Mock ConnectionQueryServices: first 3 splits → server1, last 3 splits → 
server2
+    when(mockQueryServices.getTableRegionLocation(physicalTableName, 
Bytes.toBytes("a")))
+      .thenReturn(mockRegionA);
+    when(mockQueryServices.getTableRegionLocation(physicalTableName, 
Bytes.toBytes("c")))
+      .thenReturn(mockRegionC);
+    when(mockQueryServices.getTableRegionLocation(physicalTableName, 
Bytes.toBytes("e")))
+      .thenReturn(mockRegionE);
+    when(mockQueryServices.getTableRegionLocation(physicalTableName, 
Bytes.toBytes("g")))
+      .thenReturn(mockRegionG);
+    when(mockQueryServices.getTableRegionLocation(physicalTableName, 
Bytes.toBytes("i")))
+      .thenReturn(mockRegionI);
+    when(mockQueryServices.getTableRegionLocation(physicalTableName, 
Bytes.toBytes("k")))
+      .thenReturn(mockRegionK);
+
+    // Call coalesceSplits()
+    List<InputSplit> result =
+      inputFormat.coalesceSplits(splits, mockQueryServices, physicalTableName);
+
+    // Verify: 2 coalesced splits (one per server)
+    assertEquals("Should have 2 coalesced splits (one per server)", 2, 
result.size());
+
+    // Verify: Each split is coalesced and contains 3 KeyRanges
+    PhoenixInputSplit split1 = (PhoenixInputSplit) result.get(0);
+    PhoenixInputSplit split2 = (PhoenixInputSplit) result.get(1);
+
+    assertTrue("Split 1 should be coalesced", split1.isCoalesced());
+    assertTrue("Split 2 should be coalesced", split2.isCoalesced());
+
+    assertEquals("Split 1 should have 3 KeyRanges", 3, 
split1.getKeyRanges().size());
+    assertEquals("Split 2 should have 3 KeyRanges", 3, 
split2.getKeyRanges().size());
+
+    // Verify: Splits are sorted by start key within each server group
+    List<KeyRange> keyRanges1 = split1.getKeyRanges();
+    List<KeyRange> keyRanges2 = split2.getKeyRanges();
+
+    // Check that KeyRanges are sorted (each should be less than next)
+    for (int i = 0; i < keyRanges1.size() - 1; i++) {
+      assertTrue("KeyRanges in split 1 should be sorted",
+        Bytes.compareTo(keyRanges1.get(i).getLowerRange(), keyRanges1.get(i + 
1).getLowerRange())
+            < 0);
+    }
+    for (int i = 0; i < keyRanges2.size() - 1; i++) {
+      assertTrue("KeyRanges in split 2 should be sorted",
+        Bytes.compareTo(keyRanges2.get(i).getLowerRange(), keyRanges2.get(i + 
1).getLowerRange())
+            < 0);
+    }
+  }
+
+  @Test
+  public void testCoalesceSplitsWithEmptyList() throws Exception {
+    // Test edge case: empty input list
+    List<InputSplit> splits = new ArrayList<>();
+
+    List<InputSplit> result =
+      inputFormat.coalesceSplits(splits, mockQueryServices, physicalTableName);
+
+    assertEquals("Should return empty list for empty input", 0, result.size());
+  }
+
+  @Test
+  public void testCoalesceSplitsWithSingleSplit() throws Exception {
+    // Test edge case: single split (no coalescing needed)
+    List<InputSplit> splits = new ArrayList<>();
+    splits.add(createSplit(Bytes.toBytes("a"), Bytes.toBytes("d")));
+
+    HRegionLocation mockRegion = createMockRegionLocation("server1:16020", 
Bytes.toBytes("a"));
+    when(mockQueryServices.getTableRegionLocation(any(byte[].class), 
any(byte[].class)))
+      .thenReturn(mockRegion);
+
+    List<InputSplit> result =
+      inputFormat.coalesceSplits(splits, mockQueryServices, physicalTableName);
+
+    assertEquals("Should return 1 split", 1, result.size());
+    PhoenixInputSplit resultSplit = (PhoenixInputSplit) result.get(0);
+    assertFalse("Single split should not be marked as coalesced", 
resultSplit.isCoalesced());
+    assertEquals("Should have 1 KeyRange", 1, 
resultSplit.getKeyRanges().size());
+  }
+
+  @Test
+  public void 
testCoalesceSplitsWithNullRegionLocationFallsBackToUnknownServer() throws 
Exception {
+    // Null location (e.g. region in transition) — split should be placed in 
UNKNOWN_SERVER bucket.
+    List<InputSplit> splits = new ArrayList<>();
+    splits.add(createSplit(Bytes.toBytes("a"), Bytes.toBytes("d")));
+
+    when(mockQueryServices.getTableRegionLocation(any(byte[].class), 
any(byte[].class)))
+      .thenReturn(null);
+
+    List<InputSplit> result =
+      inputFormat.coalesceSplits(splits, mockQueryServices, physicalTableName);
+
+    assertEquals("Should return 1 split assigned to UNKNOWN_SERVER bucket", 1, 
result.size());
+    PhoenixInputSplit resultSplit = (PhoenixInputSplit) result.get(0);
+    // The split is coalesced under UNKNOWN_SERVER — location reflects that 
server name.
+    assertEquals("Split location should be UNKNOWN_SERVER",
+      PhoenixSyncTableInputFormat.UNKNOWN_SERVER, 
resultSplit.getLocations()[0]);
+  }
+
+  @Test
+  public void testCoalesceSplitsWithNullServerNameFallsBackToUnknownServer() 
throws Exception {
+    // Location present but serverName is null (e.g. region in transition) — 
same fallback.
+    List<InputSplit> splits = new ArrayList<>();
+    splits.add(createSplit(Bytes.toBytes("a"), Bytes.toBytes("d")));
+
+    HRegionLocation nullServerLocation = mock(HRegionLocation.class);
+    when(nullServerLocation.getServerName()).thenReturn(null);
+    when(mockQueryServices.getTableRegionLocation(any(byte[].class), 
any(byte[].class)))
+      .thenReturn(nullServerLocation);
+
+    List<InputSplit> result =
+      inputFormat.coalesceSplits(splits, mockQueryServices, physicalTableName);
+
+    assertEquals("Should return 1 split assigned to UNKNOWN_SERVER bucket", 1, 
result.size());
+    PhoenixInputSplit resultSplit = (PhoenixInputSplit) result.get(0);
+    assertEquals("Split location should be UNKNOWN_SERVER",
+      PhoenixSyncTableInputFormat.UNKNOWN_SERVER, 
resultSplit.getLocations()[0]);
+  }
+
+  @Test
+  public void 
testCoalesceSplitsMultipleNullLocationsCoalescedIntoOneUnknownSplit()
+    throws Exception {
+    // Multiple splits with unavailable region location — all grouped into one 
UNKNOWN_SERVER split.
+    List<InputSplit> splits = new ArrayList<>();
+    splits.add(createSplit(Bytes.toBytes("a"), Bytes.toBytes("d")));
+    splits.add(createSplit(Bytes.toBytes("d"), Bytes.toBytes("g")));
+    splits.add(createSplit(Bytes.toBytes("g"), Bytes.toBytes("j")));
+
+    when(mockQueryServices.getTableRegionLocation(any(byte[].class), 
any(byte[].class)))
+      .thenReturn(null);
+
+    List<InputSplit> result =
+      inputFormat.coalesceSplits(splits, mockQueryServices, physicalTableName);
+
+    assertEquals("All RIT splits should be coalesced into one UNKNOWN_SERVER 
split", 1,
+      result.size());
+    PhoenixInputSplit coalescedSplit = (PhoenixInputSplit) result.get(0);
+    assertTrue("UNKNOWN_SERVER split should be marked as coalesced", 
coalescedSplit.isCoalesced());
+    assertEquals("Should contain all 3 KeyRanges", 3, 
coalescedSplit.getKeyRanges().size());
+  }
+
+  @Test
+  public void testCoalesceSplitsMixedValidAndNullLocations() throws Exception {
+    // Scenario: 2 splits on server1, 2 splits with null location (RIT).
+    // Expected: 2 output splits — 1 coalesced on server1, 1 coalesced on 
UNKNOWN_SERVER.
+    // This is the most realistic production scenario: a region split mid-job 
affects only a
+    // subset of regions while the rest are healthy.
+    List<InputSplit> splits = new ArrayList<>();
+    splits.add(createSplit(Bytes.toBytes("a"), Bytes.toBytes("d")));
+    splits.add(createSplit(Bytes.toBytes("d"), Bytes.toBytes("g")));
+    splits.add(createSplit(Bytes.toBytes("g"), Bytes.toBytes("j")));
+    splits.add(createSplit(Bytes.toBytes("j"), Bytes.toBytes("m")));
+
+    HRegionLocation mockRegion = createMockRegionLocation("server1:16020", 
Bytes.toBytes("a"));
+    when(mockQueryServices.getTableRegionLocation(physicalTableName, 
Bytes.toBytes("a")))
+      .thenReturn(mockRegion);
+    when(mockQueryServices.getTableRegionLocation(physicalTableName, 
Bytes.toBytes("d")))
+      .thenReturn(mockRegion);
+    when(mockQueryServices.getTableRegionLocation(physicalTableName, 
Bytes.toBytes("g")))
+      .thenReturn(null);
+    when(mockQueryServices.getTableRegionLocation(physicalTableName, 
Bytes.toBytes("j")))
+      .thenReturn(null);
+
+    List<InputSplit> result =
+      inputFormat.coalesceSplits(splits, mockQueryServices, physicalTableName);
+
+    assertEquals("Should produce 2 splits: server1 bucket + UNKNOWN_SERVER 
bucket", 2,
+      result.size());
+
+    // Collect split locations
+    List<String> locations = new ArrayList<>();
+    for (InputSplit s : result) {
+      locations.add(((PhoenixInputSplit) s).getLocations()[0]);
+    }
+
+    assertTrue("Should have a server1 split", 
locations.contains("server1:16020"));
+    assertTrue("Should have an UNKNOWN_SERVER split",
+      locations.contains(PhoenixSyncTableInputFormat.UNKNOWN_SERVER));
+
+    // Verify key range counts: server1 gets 2, UNKNOWN_SERVER gets 2
+    for (InputSplit s : result) {
+      PhoenixInputSplit ps = (PhoenixInputSplit) s;
+      String loc = ps.getLocations()[0];
+      if ("server1:16020".equals(loc)) {
+        assertEquals("server1 bucket should have 2 KeyRanges", 2, 
ps.getKeyRanges().size());
+      } else {
+        assertEquals("UNKNOWN_SERVER bucket should have 2 KeyRanges", 2, 
ps.getKeyRanges().size());
+      }
+    }
+  }
+
+  @Test
+  public void testCoalesceSplitsFailureThrowsForGenuineClusterError() throws 
Exception {
+    // A real cluster error (SQLException) should still propagate — not be 
swallowed.
+    List<InputSplit> splits = new ArrayList<>();
+    splits.add(createSplit(Bytes.toBytes("a"), Bytes.toBytes("d")));
+
+    SQLException simulatedFailure =
+      new SQLException("Simulated RegionServer communication failure");
+    when(mockQueryServices.getTableRegionLocation(any(byte[].class), 
any(byte[].class)))
+      .thenThrow(simulatedFailure);
+
+    try {
+      inputFormat.coalesceSplits(splits, mockQueryServices, physicalTableName);
+      fail("Expected SQLException to be thrown for a genuine cluster error");
+    } catch (SQLException e) {
+      assertTrue("Exception message should be preserved",
+        e.getMessage().contains("Simulated RegionServer communication 
failure"));
+    }
+  }
+
+  /**
+   * Helper method to create a mock HRegionLocation with the given server 
address and start key.
+   */
+  private HRegionLocation createMockRegionLocation(String serverAddress, 
byte[] startKey) {
+    HRegionLocation mockRegionLocation = mock(HRegionLocation.class);
+    ServerName mockServerName = mock(ServerName.class);
+    // Create a real Address object instead of mocking it, since toString() is 
final
+    // Parse the serverAddress string to extract hostname and port
+    String[] parts = serverAddress.split(":");
+    String hostname = parts[0];
+    int port = parts.length > 1 ? Integer.parseInt(parts[1]) : 16020;
+    org.apache.hadoop.hbase.net.Address address =
+      org.apache.hadoop.hbase.net.Address.fromParts(hostname, port);
+
+    when(mockServerName.getAddress()).thenReturn(address);
+    when(mockRegionLocation.getServerName()).thenReturn(mockServerName);
+    return mockRegionLocation;
+  }
 }
diff --git a/pom.xml b/pom.xml
index 1205d5f158..4e28795841 100644
--- a/pom.xml
+++ b/pom.xml
@@ -136,6 +136,7 @@
     <junit.version>4.13.1</junit.version>
     <hdrhistogram.version>2.1.12</hdrhistogram.version>
     <byte-buddy.version>1.15.11</byte-buddy.version>
+    <bcprov-jdk18on.version>1.79</bcprov-jdk18on.version>
 
     <!-- These are only used for exclusion when shading, and the exact version 
is completely
     irrelevant, but we need to keep them up to date to appease static 
checkers. While Phoenix does
@@ -1361,6 +1362,11 @@
         <artifactId>jcodings</artifactId>
         <version>${jcodings.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.bouncycastle</groupId>
+        <artifactId>bcprov-jdk18on</artifactId>
+        <version>${bcprov-jdk18on.version}</version>
+      </dependency>
       <dependency>
         <groupId>org.hdrhistogram</groupId>
         <artifactId>HdrHistogram</artifactId>

Reply via email to