This is an automated email from the ASF dual-hosted git repository.
haridsv pushed a commit to branch 5.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.3 by this push:
new 878910febf PHOENIX-7799 Coalesce splits by region server to avoid
hotspotting from concurrent mappers (#2411) (#2430)
878910febf is described below
commit 878910febf9f8cd3b39b0d375d55434812c3df05
Author: Rahul Kumar <[email protected]>
AuthorDate: Wed Apr 29 19:06:54 2026 +0530
PHOENIX-7799 Coalesce splits by region server to avoid hotspotting from
concurrent mappers (#2411) (#2430)
---------
Co-authored-by: Rahul Kumar <[email protected]>
---
phoenix-core-client/pom.xml | 1 -
phoenix-core-server/pom.xml | 1 -
.../phoenix/mapreduce/PhoenixInputSplit.java | 49 +++-
.../mapreduce/PhoenixSyncTableInputFormat.java | 141 +++++++++-
.../phoenix/mapreduce/PhoenixSyncTableMapper.java | 249 +++++++++++------
.../phoenix/mapreduce/PhoenixSyncTableTool.java | 37 ++-
phoenix-core/pom.xml | 1 -
.../phoenix/end2end/PhoenixSyncTableToolIT.java | 45 ++-
.../phoenix/mapreduce/PhoenixInputSplitTest.java | 200 ++++++++++++++
.../mapreduce/PhoenixSyncTableInputFormatTest.java | 304 +++++++++++++++++++++
pom.xml | 6 +
11 files changed, 913 insertions(+), 121 deletions(-)
diff --git a/phoenix-core-client/pom.xml b/phoenix-core-client/pom.xml
index ea9e303132..e1ba8727f3 100644
--- a/phoenix-core-client/pom.xml
+++ b/phoenix-core-client/pom.xml
@@ -253,7 +253,6 @@
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk18on</artifactId>
- <version>1.79</version>
</dependency>
</dependencies>
diff --git a/phoenix-core-server/pom.xml b/phoenix-core-server/pom.xml
index 76757abb65..c906adce0d 100644
--- a/phoenix-core-server/pom.xml
+++ b/phoenix-core-server/pom.xml
@@ -176,7 +176,6 @@
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk18on</artifactId>
- <version>1.79</version>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
index c71e6ca5d1..c49ce59232 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
@@ -38,7 +38,7 @@ import
org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
public class PhoenixInputSplit extends InputSplit implements Writable {
private List<Scan> scans;
- private KeyRange keyRange;
+ private List<KeyRange> keyRanges;
private String regionLocation = null;
private long splitSize = 0;
@@ -68,13 +68,43 @@ public class PhoenixInputSplit extends InputSplit
implements Writable {
return scans;
}
+ /**
+ * Returns the overall KeyRange spanning this split. For coalesced splits,
spans from the first
+ * region's lower bound to the last region's upper bound. Computed on-demand
from keyRanges.
+ * @return KeyRange spanning the entire split, or null if keyRanges is empty
+ */
public KeyRange getKeyRange() {
- return keyRange;
+ if (keyRanges == null || keyRanges.isEmpty()) {
+ return null;
+ }
+ return KeyRange.getKeyRange(keyRanges.get(0).getLowerRange(),
+ keyRanges.get(keyRanges.size() - 1).getUpperRange());
+ }
+
+ /**
+ * Returns all KeyRanges for this split. For coalesced splits, returns
multiple KeyRanges (one per
+ * region). For non-coalesced splits, returns a single-element list.
+ * @return List of KeyRanges, never null
+ */
+ public List<KeyRange> getKeyRanges() {
+ return keyRanges;
+ }
+
+ /**
+ * Checks if this split is coalesced (contains multiple regions).
+ * @return true if split contains multiple regions
+ */
+ public boolean isCoalesced() {
+ return keyRanges.size() > 1;
}
private void init() {
- this.keyRange =
- KeyRange.getKeyRange(scans.get(0).getStartRow(), scans.get(scans.size()
- 1).getStopRow());
+ // Initialize keyRanges from scans
+ this.keyRanges = Lists.newArrayListWithExpectedSize(scans.size());
+ for (Scan scan : scans) {
+ KeyRange kr = KeyRange.getKeyRange(scan.getStartRow(),
scan.getStopRow());
+ this.keyRanges.add(kr);
+ }
}
@Override
@@ -126,7 +156,8 @@ public class PhoenixInputSplit extends InputSplit
implements Writable {
public int hashCode() {
final int prime = 31;
int result = 1;
- result = prime * result + keyRange.hashCode();
+ KeyRange range = getKeyRange();
+ result = prime * result + (range == null ? 0 : range.hashCode());
return result;
}
@@ -142,11 +173,13 @@ public class PhoenixInputSplit extends InputSplit
implements Writable {
return false;
}
PhoenixInputSplit other = (PhoenixInputSplit) obj;
- if (keyRange == null) {
- if (other.keyRange != null) {
+ KeyRange thisRange = getKeyRange();
+ KeyRange otherRange = other.getKeyRange();
+ if (thisRange == null) {
+ if (otherRange != null) {
return false;
}
- } else if (!keyRange.equals(other.keyRange)) {
+ } else if (!thisRange.equals(otherRange)) {
return false;
}
return true;
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableInputFormat.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableInputFormat.java
index d717c3f7bb..b2dd739c0c 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableInputFormat.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableInputFormat.java
@@ -21,9 +21,13 @@ import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
@@ -31,8 +35,10 @@ import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.util.ConnectionUtil;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.KeyRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,6 +53,13 @@ public class PhoenixSyncTableInputFormat extends
PhoenixInputFormat<DBWritable>
private static final Logger LOGGER =
LoggerFactory.getLogger(PhoenixSyncTableInputFormat.class);
+ // Sentinel server name used when a region location lookup returns null or
has a null server.
+ // This can happen transiently during a region-in-transition (RIT) event
(e.g. a split).
+ // Splits that cannot be placed on a specific server are coalesced together
under this key
+ // rather than failing the job, since split coalescing is an optimisation,
not a correctness
+ // requirement.
+ static final String UNKNOWN_SERVER = "UNKNOWN_SERVER";
+
public PhoenixSyncTableInputFormat() {
super();
}
@@ -94,15 +107,31 @@ public class PhoenixSyncTableInputFormat extends
PhoenixInputFormat<DBWritable>
} catch (SQLException e) {
throw new RuntimeException(e);
}
- if (completedRegions.isEmpty()) {
- LOGGER.info("No completed regions for table {} - processing all {}
splits", tableName,
- allSplits.size());
- return allSplits;
- }
List<InputSplit> unprocessedSplits = filterCompletedSplits(allSplits,
completedRegions);
LOGGER.info("Found {} completed mapper regions for table {}, {}
unprocessed splits remaining",
completedRegions.size(), tableName, unprocessedSplits.size());
+
+ boolean enableSplitCoalescing =
+ conf.getBoolean(PhoenixSyncTableTool.PHOENIX_SYNC_TABLE_SPLIT_COALESCING,
+ PhoenixSyncTableTool.DEFAULT_PHOENIX_SYNC_TABLE_SPLIT_COALESCING);
+ LOGGER.info("Split coalescing enabled: {}, for table {}",
enableSplitCoalescing, tableName);
+
+ if (enableSplitCoalescing && unprocessedSplits.size() > 1) {
+ try (Connection conn = ConnectionUtil.getInputConnection(conf)) {
+ PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+ byte[] physicalTableName =
pConn.getTable(tableName).getPhysicalName().getBytes();
+ List<InputSplit> coalescedSplits =
+ coalesceSplits(unprocessedSplits, pConn.getQueryServices(),
physicalTableName);
+ LOGGER.info("Split coalescing: {} unprocessed splits {} coalesced
splits for table {}",
+ unprocessedSplits.size(), coalescedSplits.size(), tableName);
+ return coalescedSplits;
+ } catch (Exception e) {
+ throw new IOException(String.format("Failed to coalesce splits for
table %s. "
+ + "Split coalescing is enabled but failed due to: %s.", tableName,
e.getMessage()), e);
+ }
+ }
+
return unprocessedSplits;
}
@@ -133,6 +162,9 @@ public class PhoenixSyncTableInputFormat extends
PhoenixInputFormat<DBWritable>
*/
List<InputSplit> filterCompletedSplits(List<InputSplit> allSplits,
List<KeyRange> completedRegions) {
+ if (completedRegions.isEmpty()) {
+ return allSplits;
+ }
allSplits.sort((s1, s2) -> {
PhoenixInputSplit ps1 = (PhoenixInputSplit) s1;
PhoenixInputSplit ps2 = (PhoenixInputSplit) s2;
@@ -211,4 +243,103 @@ public class PhoenixSyncTableInputFormat extends
PhoenixInputFormat<DBWritable>
}
return unprocessedSplits;
}
+
+ /**
+ * Coalesces multiple region splits from the same RegionServer into single
InputSplits. All
+ * regions from the same server are coalesced into one split, regardless of
count or size. This
+ * reduces mapper count and avoids hot spotting when many concurrent mappers
hit the same server.
+ * @param unprocessedSplits Splits remaining after filtering completed
regions
+ * @param queryServices ConnectionQueryServices for querying region
locations
+ * @param physicalTableName Physical HBase table name
+ * @return Coalesced splits with all regions per server combined into one
split
+ */
+ List<InputSplit> coalesceSplits(List<InputSplit> unprocessedSplits,
+ ConnectionQueryServices queryServices, byte[] physicalTableName)
+ throws IOException, InterruptedException, SQLException {
+ // Group splits by RegionServer location
+ Map<String, List<PhoenixInputSplit>> splitsByServer =
+ groupSplitsByServer(unprocessedSplits, queryServices, physicalTableName);
+
+ List<InputSplit> coalescedSplits = new ArrayList<>();
+
+ // For each RegionServer, create one coalesced split with ALL regions from
that server
+ for (Map.Entry<String, List<PhoenixInputSplit>> entry :
splitsByServer.entrySet()) {
+ String serverName = entry.getKey();
+ List<PhoenixInputSplit> serverSplits = entry.getValue();
+
+ // Sort splits by start key for sequential processing
+ serverSplits.sort((s1, s2) ->
Bytes.compareTo(s1.getKeyRange().getLowerRange(),
+ s2.getKeyRange().getLowerRange()));
+ // Create single coalesced split with ALL regions from this server
+ coalescedSplits.add(createCoalescedSplit(serverSplits, serverName));
+ }
+
+ return coalescedSplits;
+ }
+
+ /**
+ * Groups splits by RegionServer location for locality-aware coalescing. Uses
+ * ConnectionQueryServices to determine which server hosts each region.
+ * <p>
+ * If the region location is unavailable (null location or null server
name), which can happen
+ * transiently during a region-in-transition (RIT) event such as a split,
the split is assigned to
+ * {@link #UNKNOWN_SERVER} rather than failing the job. Since split
coalescing is an optimisation,
+ * a transient lookup failure should degrade gracefully, not abort the MR
job.
+ * @param splits List of splits to group
+ * @param queryServices ConnectionQueryServices for querying region
locations
+ * @param physicalTableName Physical HBase table name
+ * @return Map of server name to list of splits hosted on that server
+ */
+ private Map<String, List<PhoenixInputSplit>>
groupSplitsByServer(List<InputSplit> splits,
+ ConnectionQueryServices queryServices, byte[] physicalTableName)
+ throws IOException, SQLException {
+ Map<String, List<PhoenixInputSplit>> splitsByServer = new HashMap<>();
+ for (InputSplit split : splits) {
+ PhoenixInputSplit pSplit = (PhoenixInputSplit) split;
+ KeyRange keyRange = pSplit.getKeyRange();
+ HRegionLocation regionLocation =
+ queryServices.getTableRegionLocation(physicalTableName,
keyRange.getLowerRange());
+ String serverName;
+ if (regionLocation == null || regionLocation.getServerName() == null) {
+ LOGGER.warn(
+ "Could not determine region server for key: {}. "
+ + "Region may be in transition. Assigning split to {} bucket.",
+ Bytes.toStringBinary(keyRange.getLowerRange()), UNKNOWN_SERVER);
+ serverName = UNKNOWN_SERVER;
+ } else {
+ serverName = regionLocation.getServerName().getAddress().toString();
+ }
+ splitsByServer.computeIfAbsent(serverName, k -> new
ArrayList<>()).add(pSplit);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Split {} assigned to server {}",
+ Bytes.toStringBinary(keyRange.getLowerRange()), serverName);
+ }
+ }
+
+ return splitsByServer;
+ }
+
+ /**
+ * Creates a coalesced PhoenixInputSplit containing multiple regions.
Combines scans and KeyRanges
+ * from individual splits into a single split.
+ * @param splits List of splits to coalesce (from same RegionServer)
+ * @param serverLocation RegionServer location for data locality
+ * @return Coalesced PhoenixInputSplit
+ */
+ private PhoenixInputSplit createCoalescedSplit(List<PhoenixInputSplit>
splits,
+ String serverLocation) throws IOException, InterruptedException {
+
+ List<Scan> allScans = new ArrayList<>();
+ long totalSize = 0;
+ // Extract all scans from individual splits
+ for (PhoenixInputSplit split : splits) {
+ allScans.addAll(split.getScans());
+ totalSize += split.getLength();
+ }
+
+ LOGGER.info("Created coalesced split with {} regions, {} MB from server
{}", splits.size(),
+ totalSize / (1024 * 1024), serverLocation);
+ // Create a new PhoenixInputSplit, keyRanges will be derived from scans in
init()
+ return new PhoenixInputSplit(allScans, totalSize, serverLocation);
+ }
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableMapper.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableMapper.java
index acaac01fc3..65e932ae78 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableMapper.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableMapper.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
@@ -45,7 +44,6 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.util.ConnectionUtil;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.query.KeyRange;
-import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.util.MetaDataUtil;
import org.apache.phoenix.util.PhoenixRuntime;
@@ -88,16 +86,13 @@ public class PhoenixSyncTableMapper
private Connection globalConnection;
private PTable pTable;
private byte[] physicalTableName;
- private byte[] mapperRegionStart;
- private byte[] mapperRegionEnd;
+ private List<KeyRange> regionKeyRanges;
private PhoenixSyncTableOutputRepository syncTableOutputRepository;
- private Timestamp mapperStartTime;
@Override
protected void setup(Context context) throws InterruptedException {
try {
super.setup(context);
- mapperStartTime = new Timestamp(System.currentTimeMillis());
this.conf = context.getConfiguration();
tableName = PhoenixSyncTableTool.getPhoenixSyncTableName(conf);
targetZkQuorum =
PhoenixSyncTableTool.getPhoenixSyncTableTargetZkQuorum(conf);
@@ -123,18 +118,25 @@ public class PhoenixSyncTableMapper
}
/**
- * Extracts mapper region boundaries from the PhoenixInputSplit
+ * Extracts region key ranges from the PhoenixInputSplit. Handles both
single-region splits and
+ * coalesced splits with multiple regions.
*/
private void extractRegionBoundariesFromSplit(Context context) {
PhoenixInputSplit split = (PhoenixInputSplit) context.getInputSplit();
- KeyRange keyRange = split.getKeyRange();
- if (keyRange == null) {
+ regionKeyRanges = split.getKeyRanges();
+
+ if (regionKeyRanges == null || regionKeyRanges.isEmpty()) {
throw new IllegalStateException(String.format(
- "PhoenixInputSplit has no KeyRange for table: %s . Cannot determine
region boundaries for sync operation.",
+ "PhoenixInputSplit has no KeyRanges for table: %s. Cannot determine
region boundaries for sync operation.",
tableName));
}
- mapperRegionStart = keyRange.getLowerRange();
- mapperRegionEnd = keyRange.getUpperRange();
+
+ if (split.isCoalesced()) {
+ LOGGER.info("Mapper processing coalesced split with {} regions for table
{}",
+ regionKeyRanges.size(), tableName);
+ } else {
+ LOGGER.info("Mapper processing single region split for table {}",
tableName);
+ }
}
/**
@@ -156,67 +158,23 @@ public class PhoenixSyncTableMapper
}
/**
- * Processes a mapper region by comparing chunks between source and target
clusters. Gets already
- * processed chunks from checkpoint table, resumes from check pointed
progress and records final
- * status for chunks & mapper (VERIFIED/MISMATCHED).
+ * Processes mapper region(s) by comparing chunks between source and target
clusters. For
+ * coalesced splits, processes each region sequentially. Gets already
processed chunks from
+ * checkpoint table, resumes from check pointed progress and records final
status for chunks &
+ * mapper (VERIFIED/MISMATCHED).
*/
@Override
protected void map(NullWritable key, DBInputFormat.NullDBWritable value,
Context context)
throws IOException, InterruptedException {
+ context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1);
try {
- List<PhoenixSyncTableCheckpointOutputRow> processedChunks =
- syncTableOutputRepository.getProcessedChunks(tableName,
targetZkQuorum, fromTime, toTime,
- tenantId, mapperRegionStart, mapperRegionEnd);
- List<KeyRange> unprocessedRanges =
- calculateUnprocessedRanges(mapperRegionStart, mapperRegionEnd,
processedChunks);
- boolean isStartKeyInclusive =
shouldStartKeyBeInclusive(mapperRegionStart, processedChunks);
- for (KeyRange range : unprocessedRanges) {
- processMapperRanges(range.getLowerRange(), range.getUpperRange(),
isStartKeyInclusive,
- context);
- isStartKeyInclusive = false;
- }
-
- long chunksMismatched =
context.getCounter(SyncCounters.CHUNKS_MISMATCHED).getValue();
- long chunksVerified =
context.getCounter(SyncCounters.CHUNKS_VERIFIED).getValue();
- long sourceRowsProcessed =
context.getCounter(SyncCounters.SOURCE_ROWS_PROCESSED).getValue();
- long targetRowsProcessed =
context.getCounter(SyncCounters.TARGET_ROWS_PROCESSED).getValue();
- Timestamp mapperEndTime = new Timestamp(System.currentTimeMillis());
- String counters = PhoenixSyncTableCheckpointOutputRow.CounterFormatter
- .formatMapper(chunksVerified, chunksMismatched, sourceRowsProcessed,
targetRowsProcessed);
- if (sourceRowsProcessed > 0) {
- if (chunksMismatched == 0) {
- context.getCounter(SyncCounters.MAPPERS_VERIFIED).increment(1);
- syncTableOutputRepository
- .checkpointSyncTableResult(new
PhoenixSyncTableCheckpointOutputRow.Builder()
- .setTableName(tableName).setTargetCluster(targetZkQuorum)
-
.setType(PhoenixSyncTableCheckpointOutputRow.Type.REGION).setFromTime(fromTime)
- .setToTime(toTime).setTenantId(tenantId).setIsDryRun(isDryRun)
- .setStartRowKey(mapperRegionStart).setEndRowKey(mapperRegionEnd)
- .setStatus(PhoenixSyncTableCheckpointOutputRow.Status.VERIFIED)
-
.setExecutionStartTime(mapperStartTime).setExecutionEndTime(mapperEndTime)
- .setCounters(counters).build());
- LOGGER.info(
- "PhoenixSyncTable mapper completed with verified: {} verified
chunks, {} mismatched chunks",
- chunksVerified, chunksMismatched);
- } else {
- context.getCounter(SyncCounters.MAPPERS_MISMATCHED).increment(1);
- LOGGER.warn(
- "PhoenixSyncTable mapper completed with mismatch: {} verified
chunks, {} mismatched chunks",
- chunksVerified, chunksMismatched);
- syncTableOutputRepository
- .checkpointSyncTableResult(new
PhoenixSyncTableCheckpointOutputRow.Builder()
- .setTableName(tableName).setTargetCluster(targetZkQuorum)
-
.setType(PhoenixSyncTableCheckpointOutputRow.Type.REGION).setFromTime(fromTime)
- .setToTime(toTime).setTenantId(tenantId).setIsDryRun(isDryRun)
- .setStartRowKey(mapperRegionStart).setEndRowKey(mapperRegionEnd)
- .setStatus(PhoenixSyncTableCheckpointOutputRow.Status.MISMATCHED)
-
.setExecutionStartTime(mapperStartTime).setExecutionEndTime(mapperEndTime)
- .setCounters(counters).build());
- }
- } else {
- LOGGER.info(
- "No rows pending to process. All mapper region boundaries are
covered for startKey:{}, endKey: {}",
- mapperRegionStart, mapperRegionEnd);
+ // Process each region in the split (one or multiple for coalesced
splits)
+ for (KeyRange keyRange : regionKeyRanges) {
+ byte[] regionStart = keyRange.getLowerRange();
+ byte[] regionEnd = keyRange.getUpperRange();
+ LOGGER.info("Processing region [{}, {}) from split for table {}",
+ Bytes.toStringBinary(regionStart), Bytes.toStringBinary(regionEnd),
tableName);
+ processRegion(regionStart, regionEnd, context);
}
} catch (SQLException e) {
tryClosingResources();
@@ -224,6 +182,124 @@ public class PhoenixSyncTableMapper
}
}
+ /**
+ * Processes a single region within a split (could be part of a coalesced
split).
+ * @param regionStart Start key of the region
+ * @param regionEnd End key of the region
+ * @param context Mapper context
+ */
+ private void processRegion(byte[] regionStart, byte[] regionEnd, Context
context)
+ throws SQLException, IOException, InterruptedException {
+
+ Timestamp regionStartTime = new Timestamp(System.currentTimeMillis());
+
+ // Get processed chunks for this specific region
+ List<PhoenixSyncTableCheckpointOutputRow> processedChunks =
+ syncTableOutputRepository.getProcessedChunks(tableName, targetZkQuorum,
fromTime, toTime,
+ tenantId, regionStart, regionEnd);
+
+ // Calculate unprocessed ranges within this region
+ List<KeyRange> unprocessedRanges =
+ calculateUnprocessedRanges(regionStart, regionEnd, processedChunks);
+
+ // Track counters before processing this region
+ long verifiedBefore =
context.getCounter(SyncCounters.CHUNKS_VERIFIED).getValue();
+ long mismatchedBefore =
context.getCounter(SyncCounters.CHUNKS_MISMATCHED).getValue();
+ long sourceRowsBefore =
context.getCounter(SyncCounters.SOURCE_ROWS_PROCESSED).getValue();
+ long targetRowsBefore =
context.getCounter(SyncCounters.TARGET_ROWS_PROCESSED).getValue();
+
+ // Process all unprocessed ranges in this region
+ boolean isStartKeyInclusive = shouldStartKeyBeInclusive(regionStart,
processedChunks);
+ for (KeyRange range : unprocessedRanges) {
+ processMapperRanges(range.getLowerRange(), range.getUpperRange(),
isStartKeyInclusive,
+ context);
+ isStartKeyInclusive = false;
+ }
+
+ // Calculate counters for this region only
+ long verifiedChunks =
+ context.getCounter(SyncCounters.CHUNKS_VERIFIED).getValue() -
verifiedBefore;
+ long mismatchedChunks =
+ context.getCounter(SyncCounters.CHUNKS_MISMATCHED).getValue() -
mismatchedBefore;
+ long sourceRowsProcessed =
+ context.getCounter(SyncCounters.SOURCE_ROWS_PROCESSED).getValue() -
sourceRowsBefore;
+ long targetRowsProcessed =
+ context.getCounter(SyncCounters.TARGET_ROWS_PROCESSED).getValue() -
targetRowsBefore;
+
+ Timestamp regionEndTime = new Timestamp(System.currentTimeMillis());
+ String counters = PhoenixSyncTableCheckpointOutputRow.CounterFormatter
+ .formatMapper(verifiedChunks, mismatchedChunks, sourceRowsProcessed,
targetRowsProcessed);
+ if (sourceRowsProcessed > 0) {
+ recordRegionCompletion(regionStart, regionEnd, regionStartTime,
regionEndTime, verifiedChunks,
+ mismatchedChunks, counters, context);
+ } else {
+ LOGGER.info(
+ "No rows pending to process. All region boundaries are covered for
startKey:{}, endKey: {}",
+ Bytes.toStringBinary(regionStart), Bytes.toStringBinary(regionEnd));
+ }
+ }
+
+ /**
+ * Records region completion by updating counters, recording checkpoint, and
logging result.
+ * Consolidates all region completion logic to eliminate duplication.
+ * @param regionStart Region start key
+ * @param regionEnd Region end key
+ * @param regionStartTime Region processing start time
+ * @param regionEndTime Region processing end time
+ * @param verifiedChunks Number of verified chunks
+ * @param mismatchedChunks Number of mismatched chunks
+ * @param counters Formatted counter string
+ * @param context Mapper context
+ */
+ private void recordRegionCompletion(byte[] regionStart, byte[] regionEnd,
+ Timestamp regionStartTime, Timestamp regionEndTime, long verifiedChunks,
long mismatchedChunks,
+ String counters, Context context) throws SQLException {
+
+ boolean isVerified = mismatchedChunks == 0;
+ PhoenixSyncTableCheckpointOutputRow.Status status = isVerified
+ ? PhoenixSyncTableCheckpointOutputRow.Status.VERIFIED
+ : PhoenixSyncTableCheckpointOutputRow.Status.MISMATCHED;
+
+ context.getCounter(isVerified ? SyncCounters.MAPPERS_VERIFIED :
SyncCounters.MAPPERS_MISMATCHED)
+ .increment(1);
+
+ recordRegionCheckpoint(regionStart, regionEnd, status, regionStartTime,
regionEndTime,
+ counters);
+
+ String logMessage = String.format(
+ "PhoenixSyncTable region [%s, %s) completed with %s: %d verified chunks,
%d mismatched chunks",
+ Bytes.toStringBinary(regionStart), Bytes.toStringBinary(regionEnd),
+ isVerified ? "verified" : "mismatch", verifiedChunks, mismatchedChunks);
+
+ if (isVerified) {
+ LOGGER.info(logMessage);
+ } else {
+ LOGGER.warn(logMessage);
+ }
+ }
+
+ /**
+ * Records a region checkpoint to the checkpoint table.
+ * @param regionStart Region start key
+ * @param regionEnd Region end key
+ * @param status Status (VERIFIED or MISMATCHED)
+ * @param regionStartTime Region processing start time
+ * @param regionEndTime Region processing end time
+ * @param counters Formatted counter string
+ */
+ private void recordRegionCheckpoint(byte[] regionStart, byte[] regionEnd,
+ PhoenixSyncTableCheckpointOutputRow.Status status, Timestamp
regionStartTime,
+ Timestamp regionEndTime, String counters) throws SQLException {
+
+ syncTableOutputRepository
+ .checkpointSyncTableResult(new
PhoenixSyncTableCheckpointOutputRow.Builder()
+ .setTableName(tableName).setTargetCluster(targetZkQuorum)
+
.setType(PhoenixSyncTableCheckpointOutputRow.Type.REGION).setFromTime(fromTime)
+
.setToTime(toTime).setTenantId(tenantId).setIsDryRun(isDryRun).setStartRowKey(regionStart)
+
.setEndRowKey(regionEnd).setStatus(status).setExecutionStartTime(regionStartTime)
+ .setExecutionEndTime(regionEndTime).setCounters(counters).build());
+ }
+
/**
* Processes a chunk range by comparing source and target cluster data.
Source chunking: Breaks
* data into size-based chunks within given mapper region boundary. Target
chunking: Follows
@@ -408,13 +484,7 @@ public class PhoenixSyncTableMapper
scan.setAttribute(BaseScannerRegionObserverConstants.SYNC_TABLE_CHUNK_SIZE_BYTES,
Bytes.toBytes(chunkSizeBytes));
}
- // Use the half of the HBase RPC timeout value as the server page size to
make sure
- // that the HBase region server will be able to send a heartbeat message
to the
- // client before the client times out.
- long syncTablePageTimeoutMs = (long)
(conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY,
- QueryServicesOptions.DEFAULT_SYNC_TABLE_RPC_TIMEOUT) * 0.5);
- scan.setAttribute(BaseScannerRegionObserverConstants.SERVER_PAGE_SIZE_MS,
- Bytes.toBytes(syncTablePageTimeoutMs));
+ ScanUtil.setScanAttributeForPaging(scan, phoenixConn);
ResultScanner scanner = hTable.getScanner(scan);
return new ChunkScannerContext(hTable, scanner);
}
@@ -453,14 +523,8 @@ public class PhoenixSyncTableMapper
private void handleVerifiedChunk(ChunkInfo sourceChunk, Context context,
String counters)
throws SQLException {
- syncTableOutputRepository.checkpointSyncTableResult(
- new PhoenixSyncTableCheckpointOutputRow.Builder().setTableName(tableName)
-
.setTargetCluster(targetZkQuorum).setType(PhoenixSyncTableCheckpointOutputRow.Type.CHUNK)
-
.setFromTime(fromTime).setToTime(toTime).setTenantId(tenantId).setIsDryRun(isDryRun)
- .setStartRowKey(sourceChunk.startKey).setEndRowKey(sourceChunk.endKey)
- .setStatus(PhoenixSyncTableCheckpointOutputRow.Status.VERIFIED)
- .setExecutionStartTime(sourceChunk.executionStartTime)
-
.setExecutionEndTime(sourceChunk.executionEndTime).setCounters(counters).build());
+ recordChunkCheckpoint(sourceChunk,
PhoenixSyncTableCheckpointOutputRow.Status.VERIFIED,
+ counters);
context.getCounter(SyncCounters.CHUNKS_VERIFIED).increment(1);
}
@@ -468,16 +532,27 @@ public class PhoenixSyncTableMapper
throws SQLException {
LOGGER.warn("Chunk mismatch detected for table: {}, with startKey: {},
endKey {}", tableName,
Bytes.toStringBinary(sourceChunk.startKey),
Bytes.toStringBinary(sourceChunk.endKey));
+ recordChunkCheckpoint(sourceChunk,
PhoenixSyncTableCheckpointOutputRow.Status.MISMATCHED,
+ counters);
+ context.getCounter(SyncCounters.CHUNKS_MISMATCHED).increment(1);
+ }
+
+ /**
+ * Records a chunk checkpoint to the checkpoint table.
+ * @param sourceChunk Chunk information
+ * @param status Status (VERIFIED or MISMATCHED)
+ * @param counters Formatted counter string
+ */
+ private void recordChunkCheckpoint(ChunkInfo sourceChunk,
+ PhoenixSyncTableCheckpointOutputRow.Status status, String counters) throws
SQLException {
+
syncTableOutputRepository.checkpointSyncTableResult(
new PhoenixSyncTableCheckpointOutputRow.Builder().setTableName(tableName)
.setTargetCluster(targetZkQuorum).setType(PhoenixSyncTableCheckpointOutputRow.Type.CHUNK)
.setFromTime(fromTime).setToTime(toTime).setTenantId(tenantId).setIsDryRun(isDryRun)
- .setStartRowKey(sourceChunk.startKey).setEndRowKey(sourceChunk.endKey)
- .setStatus(PhoenixSyncTableCheckpointOutputRow.Status.MISMATCHED)
+
.setStartRowKey(sourceChunk.startKey).setEndRowKey(sourceChunk.endKey).setStatus(status)
.setExecutionStartTime(sourceChunk.executionStartTime)
.setExecutionEndTime(sourceChunk.executionEndTime).setCounters(counters).build());
-
- context.getCounter(SyncCounters.CHUNKS_MISMATCHED).increment(1);
}
/**
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableTool.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableTool.java
index 90963c37a3..80eacde25e 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableTool.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/mapreduce/PhoenixSyncTableTool.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
@@ -115,6 +116,8 @@ public class PhoenixSyncTableTool extends Configured
implements Tool {
private static final Option READ_ALL_VERSIONS_OPTION = new Option("rav",
"read-all-versions",
false,
"Enable reading all cell versions (optional, disabled by default, reads
only latest version)");
+ private static final Option COALESCE_SPLIT_OPTION = new Option("coal",
"coalesce-split", false,
+ "Enable split coalescing to reduce mapper count (optional, disabled by
default)");
private static final Option HELP_OPTION = new Option("h", "help", false,
"Help");
public static final String PHOENIX_SYNC_TABLE_NAME =
"phoenix.sync.table.table.name";
@@ -124,7 +127,10 @@ public class PhoenixSyncTableTool extends Configured
implements Tool {
public static final String PHOENIX_SYNC_TABLE_DRY_RUN =
"phoenix.sync.table.dry.run";
public static final String PHOENIX_SYNC_TABLE_CHUNK_SIZE_BYTES =
"phoenix.sync.table.chunk.size.bytes";
- public static final long DEFAULT_PHOENIX_SYNC_TABLE_CHUNK_SIZE_BYTES = 1024
* 1024 * 1024; // 1GB
+ public static final long DEFAULT_PHOENIX_SYNC_TABLE_CHUNK_SIZE_BYTES = 1024
* 1024 * 1024; // 1 GB
+ public static final String PHOENIX_SYNC_TABLE_SPLIT_COALESCING =
+ "phoenix.sync.table.split.coalescing";
+ public static final boolean DEFAULT_PHOENIX_SYNC_TABLE_SPLIT_COALESCING =
false;
public static final String PHOENIX_SYNC_TABLE_RAW_SCAN =
"phoenix.sync.table.raw.scan";
public static final String PHOENIX_SYNC_TABLE_READ_ALL_VERSIONS =
"phoenix.sync.table.read.all.versions";
@@ -140,6 +146,7 @@ public class PhoenixSyncTableTool extends Configured
implements Tool {
private String tenantId;
private boolean isRawScan = false;
private boolean isReadAllVersions = false;
+ private boolean isCoalesceSplit = false;
private String qTable;
private String qSchemaName;
@@ -218,6 +225,7 @@ public class PhoenixSyncTableTool extends Configured
implements Tool {
setPhoenixSyncTableDryRun(configuration, isDryRun);
setPhoenixSyncTableRawScan(configuration, isRawScan);
setPhoenixSyncTableReadAllVersions(configuration, isReadAllVersions);
+ setPhoenixSyncTableSplitCoalescing(configuration, isCoalesceSplit);
PhoenixConfigurationUtil.setSplitByStats(configuration, false);
if (chunkSizeBytes != null) {
setPhoenixSyncTableChunkSizeBytes(configuration, chunkSizeBytes);
@@ -292,6 +300,7 @@ public class PhoenixSyncTableTool extends Configured
implements Tool {
options.addOption(TENANT_ID_OPTION);
options.addOption(RAW_SCAN_OPTION);
options.addOption(READ_ALL_VERSIONS_OPTION);
+ options.addOption(COALESCE_SPLIT_OPTION);
options.addOption(HELP_OPTION);
return options;
}
@@ -307,7 +316,7 @@ public class PhoenixSyncTableTool extends Configured
implements Tool {
formatter.printHelp(cmdLineSyntax,
"Synchronize a Phoenix table between source and target clusters",
options,
"\nExample:\n" + cmdLineSyntax + " \\\n" + " --table-name MY_TABLE \\\n"
- + " --target-cluster <zk_quorum>:2181 \\\n" + " --dry-run\n",
+ + " --target-cluster <zk_quorum>:2181 \\\n" + " --dry-run \\\n" + "
--coalesce-split\n",
true);
System.exit(exitCode);
}
@@ -344,15 +353,16 @@ public class PhoenixSyncTableTool extends Configured
implements Tool {
isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
isRawScan = cmdLine.hasOption(RAW_SCAN_OPTION.getOpt());
isReadAllVersions = cmdLine.hasOption(READ_ALL_VERSIONS_OPTION.getOpt());
+ isCoalesceSplit = cmdLine.hasOption(COALESCE_SPLIT_OPTION.getOpt());
qTable = SchemaUtil.getQualifiedTableName(schemaName, tableName);
qSchemaName = SchemaUtil.normalizeIdentifier(schemaName);
PhoenixMapReduceUtil.validateTimeRange(startTime, endTime, qTable);
LOGGER.info(
"PhoenixSyncTableTool configured - Table: {}, Schema: {}, Target: {}, "
+ "StartTime: {}, EndTime: {}, DryRun: {}, ChunkSize: {}, Foreground:
{}, TenantId: {}, "
- + "RawScan: {}, ReadAllVersions: {}",
+ + "RawScan: {}, ReadAllVersions: {}, CoalesceSplit: {}",
qTable, qSchemaName, targetZkQuorum, startTime, endTime, isDryRun,
chunkSizeBytes,
- isForeground, tenantId, isRawScan, isReadAllVersions);
+ isForeground, tenantId, isRawScan, isReadAllVersions, isCoalesceSplit);
}
/**
@@ -396,6 +406,7 @@ public class PhoenixSyncTableTool extends Configured
implements Tool {
}
Counters counters = job.getCounters();
if (counters != null) {
+ long taskCreated =
counters.findCounter(JobCounter.TOTAL_LAUNCHED_MAPS).getValue();
long verifiedMappers =
counters.findCounter(PhoenixSyncTableMapper.SyncCounters.MAPPERS_VERIFIED).getValue();
long mismatchedMappers =
@@ -409,11 +420,11 @@ public class PhoenixSyncTableTool extends Configured
implements Tool {
long targetRowsProcessed =
counters.findCounter(PhoenixSyncTableMapper.SyncCounters.TARGET_ROWS_PROCESSED).getValue();
LOGGER.info(
- "PhoenixSyncTable job completed, gathered counters are \n Verified
Mappers: {}, \n"
+ "PhoenixSyncTable job completed, gathered counters are \n Task
Created: {}, \n Verified Mappers: {}, \n"
+ "Mismatched Mappers: {}, \n Chunks Verified: {}, \n"
+ "Chunks Mismatched: {}, \n Source Rows Processed: {}, \n Target
Rows Processed: {}",
- verifiedMappers, mismatchedMappers, chunksVerified, chunksMismatched,
sourceRowsProcessed,
- targetRowsProcessed);
+ taskCreated, verifiedMappers, mismatchedMappers, chunksVerified,
chunksMismatched,
+ sourceRowsProcessed, targetRowsProcessed);
} else {
LOGGER.warn("Unable to retrieve job counters for table {} - job may have
failed "
+ "during initialization", qTable);
@@ -535,6 +546,18 @@ public class PhoenixSyncTableTool extends Configured
implements Tool {
return conf.getBoolean(PHOENIX_SYNC_TABLE_READ_ALL_VERSIONS, false);
}
+ public static void setPhoenixSyncTableSplitCoalescing(Configuration conf,
+ boolean splitCoalescing) {
+ Preconditions.checkNotNull(conf);
+ conf.setBoolean(PHOENIX_SYNC_TABLE_SPLIT_COALESCING, splitCoalescing);
+ }
+
+ public static boolean getPhoenixSyncTableSplitCoalescing(Configuration conf)
{
+ Preconditions.checkNotNull(conf);
+ return conf.getBoolean(PHOENIX_SYNC_TABLE_SPLIT_COALESCING,
+ DEFAULT_PHOENIX_SYNC_TABLE_SPLIT_COALESCING);
+ }
+
public Job getJob() {
return job;
}
diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml
index 2bb3d74bf9..10ece383a9 100644
--- a/phoenix-core/pom.xml
+++ b/phoenix-core/pom.xml
@@ -392,7 +392,6 @@
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk18on</artifactId>
- <version>1.79</version>
<scope>test</scope>
</dependency>
<!-- logging end -->
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixSyncTableToolIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixSyncTableToolIT.java
index fc277131ab..0661973dbe 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixSyncTableToolIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixSyncTableToolIT.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskCounter;
import
org.apache.phoenix.jdbc.HighAvailabilityTestingUtility.HBaseTestingUtilityPair;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDriver;
@@ -159,6 +160,7 @@ public class PhoenixSyncTableToolIT {
validateSyncCounters(counters, 10, 10, 1, 3);
validateMapperCounters(counters, 1, 3);
+ assertEquals("Expected 4 mapper task to be created", 4,
counters.taskCreated);
List<PhoenixSyncTableCheckpointOutputRow> checkpointEntries =
queryCheckpointTable(sourceConnection, uniqueTableName, targetZkQuorum,
null);
@@ -205,6 +207,8 @@ public class PhoenixSyncTableToolIT {
validateSyncCounters(counters, 10, 7, 7, 3);
validateMapperCounters(counters, 1, 3);
+ assertEquals("Should have only 1 Mapper task created with coalescing", 4,
counters.taskCreated);
+
}
@Test
@@ -1244,10 +1248,8 @@ public class PhoenixSyncTableToolIT {
// Configure paging with aggressive timeouts to force mid-chunk timeouts
Configuration conf = new
Configuration(CLUSTERS.getHBaseCluster1().getConfiguration());
-
- long aggressiveRpcTimeout = 2L;
- conf.setLong(QueryServices.SYNC_TABLE_RPC_TIMEOUT_ATTRIB,
aggressiveRpcTimeout);
- conf.setLong(HConstants.HBASE_RPC_TIMEOUT_KEY, aggressiveRpcTimeout);
+ conf.setBoolean(QueryServices.PHOENIX_SERVER_PAGING_ENABLED_ATTRIB, true);
+ conf.setLong(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, 1);
int chunkSize = 10240;
@@ -1307,15 +1309,10 @@ public class PhoenixSyncTableToolIT {
// Configure paging with aggressive timeouts to force mid-chunk timeouts
Configuration conf = new
Configuration(CLUSTERS.getHBaseCluster1().getConfiguration());
-
- // Enable server-side paging
conf.setBoolean(QueryServices.PHOENIX_SERVER_PAGING_ENABLED_ATTRIB, true);
- // Set extremely short rpc timeout to force frequent paging
- long aggressiveRpcTimeout = 1L; // 1ms RPC timeout
- conf.setLong(QueryServices.SYNC_TABLE_RPC_TIMEOUT_ATTRIB,
aggressiveRpcTimeout);
- conf.setLong(HConstants.HBASE_RPC_TIMEOUT_KEY, aggressiveRpcTimeout);
+ conf.setLong(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, 1);
- int chunkSize = 102400; // 100KB
+ int chunkSize = 10240;
// Create a thread that will perform splits on source cluster during sync
Thread sourceSplitThread = new Thread(() -> {
@@ -1751,6 +1748,29 @@ public class PhoenixSyncTableToolIT {
validateMapperCounters(counters3, 3, 1);
}
+ @Test
+ public void testSyncTableValidateWithSplitCoalescing() throws Exception {
+ setupStandardTestWithReplication(uniqueTableName, 1, 10);
+
+ introduceAndVerifyTargetDifferences(uniqueTableName);
+
+ // Enable split coalescing via command-line parameter, all regions will be
coalesced into one
+ // mapper
+ Job job = runSyncTool(uniqueTableName, "--coalesce-split");
+ SyncCountersResult counters = getSyncCounters(job);
+
+ assertEquals("Should have only 1 Mapper task created with coalescing", 1,
counters.taskCreated);
+
+ validateSyncCounters(counters, 10, 10, 7, 3);
+ validateMapperCounters(counters, 1, 3);
+
+ // Verify checkpoint entries are created correctly
+ List<PhoenixSyncTableCheckpointOutputRow> checkpointEntries =
+ queryCheckpointTable(sourceConnection, uniqueTableName, targetZkQuorum,
null);
+ validateCheckpointEntries(checkpointEntries, uniqueTableName,
targetZkQuorum, 10, 10, 7, 3, 4,
+ 3, null);
+ }
+
/**
* Helper class to hold separated mapper and chunk entries.
*/
@@ -2327,6 +2347,7 @@ public class PhoenixSyncTableToolIT {
LOGGER.info("Split completed for table {} at split point {} (bytes:
{})", tableName, splitId,
Bytes.toStringBinary(splitPoint));
} catch (Exception e) {
+ // Ignore split failures - they don't affect the test's main goal
LOGGER.warn("Failed to split table {} at split point {}: {}", tableName,
splitId,
e.getMessage());
}
@@ -2576,6 +2597,7 @@ public class PhoenixSyncTableToolIT {
public final long chunksVerified;
public final long mappersVerified;
public final long mappersMismatched;
+ public final long taskCreated;
SyncCountersResult(Counters counters) {
this.sourceRowsProcessed =
@@ -2586,6 +2608,7 @@ public class PhoenixSyncTableToolIT {
this.chunksVerified =
counters.findCounter(SyncCounters.CHUNKS_VERIFIED).getValue();
this.mappersVerified =
counters.findCounter(SyncCounters.MAPPERS_VERIFIED).getValue();
this.mappersMismatched =
counters.findCounter(SyncCounters.MAPPERS_MISMATCHED).getValue();
+ this.taskCreated =
counters.findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue();
}
public void logCounters(String testName) {
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixInputSplitTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixInputSplitTest.java
new file mode 100644
index 0000000000..d780e25b87
--- /dev/null
+++
b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixInputSplitTest.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.query.KeyRange;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class PhoenixInputSplitTest {
+
+ /**
+ * Helper method to create a Scan with given row boundaries.
+ */
+ private Scan createScan(byte[] startRow, byte[] stopRow) {
+ Scan scan = new Scan();
+ scan.withStartRow(startRow, true);
+ scan.withStopRow(stopRow, false);
+ return scan;
+ }
+
+ /**
+ * Helper method to serialize and deserialize a PhoenixInputSplit.
+ */
+ private PhoenixInputSplit serializeAndDeserialize(PhoenixInputSplit split)
throws IOException {
+ // Serialize
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutput out = new DataOutputStream(baos);
+ split.write(out);
+
+ // Deserialize
+ ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+ DataInput in = new DataInputStream(bais);
+ PhoenixInputSplit deserialized = new PhoenixInputSplit();
+ deserialized.readFields(in);
+
+ return deserialized;
+ }
+
+ @Test
+ public void testStandardConstructorWithSingleScan() {
+ List<Scan> scans = new ArrayList<>();
+ scans.add(createScan(Bytes.toBytes("a"), Bytes.toBytes("d")));
+
+ PhoenixInputSplit split = new PhoenixInputSplit(scans);
+
+ assertFalse("Should not be coalesced with single scan",
split.isCoalesced());
+ assertEquals("Should have 1 keyRange", 1, split.getKeyRanges().size());
+ assertNotNull("KeyRange should be initialized", split.getKeyRange());
+ assertTrue("KeyRange should span scan boundaries",
+ Bytes.equals(Bytes.toBytes("a"), split.getKeyRange().getLowerRange()));
+ assertTrue("KeyRange should span scan boundaries",
+ Bytes.equals(Bytes.toBytes("d"), split.getKeyRange().getUpperRange()));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testStandardConstructorWithNullScans() {
+ new PhoenixInputSplit(null);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testStandardConstructorWithEmptyScans() {
+ new PhoenixInputSplit(Collections.emptyList());
+ }
+
+ @Test
+ public void testCoalescingConstructorWithMultipleRegions()
+ throws IOException, InterruptedException {
+ List<Scan> scans = new ArrayList<>();
+ scans.add(createScan(Bytes.toBytes("a"), Bytes.toBytes("d")));
+ scans.add(createScan(Bytes.toBytes("d"), Bytes.toBytes("g")));
+ scans.add(createScan(Bytes.toBytes("g"), Bytes.toBytes("j")));
+
+ long splitSize = 3072;
+ String regionLocation = "server1:16020";
+
+ // KeyRanges are now automatically derived from scans
+ PhoenixInputSplit split = new PhoenixInputSplit(scans, splitSize,
regionLocation);
+
+ assertTrue("Should be coalesced with multiple regions",
split.isCoalesced());
+ assertEquals("Should have 3 keyRanges (derived from scans)", 3,
split.getKeyRanges().size());
+ assertEquals("Split size should match", splitSize, split.getLength());
+ assertArrayEquals("Region location should match", new String[] {
regionLocation },
+ split.getLocations());
+
+ // Verify keyRanges were derived correctly from scans
+ List<KeyRange> keyRanges = split.getKeyRanges();
+ assertTrue("First keyRange should match first scan",
+ Bytes.equals(Bytes.toBytes("a"), keyRanges.get(0).getLowerRange()));
+ assertTrue("First keyRange should match first scan",
+ Bytes.equals(Bytes.toBytes("d"), keyRanges.get(0).getUpperRange()));
+ assertTrue("Second keyRange should match second scan",
+ Bytes.equals(Bytes.toBytes("d"), keyRanges.get(1).getLowerRange()));
+ assertTrue("Second keyRange should match second scan",
+ Bytes.equals(Bytes.toBytes("g"), keyRanges.get(1).getUpperRange()));
+ assertTrue("Third keyRange should match third scan",
+ Bytes.equals(Bytes.toBytes("g"), keyRanges.get(2).getLowerRange()));
+ assertTrue("Third keyRange should match third scan",
+ Bytes.equals(Bytes.toBytes("j"), keyRanges.get(2).getUpperRange()));
+ }
+
+ @Test
+ public void testSerializationWithSingleRegion() throws IOException,
InterruptedException {
+ List<Scan> scans = new ArrayList<>();
+ scans.add(createScan(Bytes.toBytes("a"), Bytes.toBytes("d")));
+
+ PhoenixInputSplit original = new PhoenixInputSplit(scans, 1024,
"server1:16020");
+
+ PhoenixInputSplit deserialized = serializeAndDeserialize(original);
+
+ assertFalse("Should not be coalesced after deserialization",
deserialized.isCoalesced());
+ assertEquals("Should have 1 keyRange", 1,
deserialized.getKeyRanges().size());
+ assertEquals("Split size should match", original.getLength(),
deserialized.getLength());
+ assertArrayEquals("Region location should match", original.getLocations(),
+ deserialized.getLocations());
+ assertTrue("KeyRange should match",
Bytes.equals(original.getKeyRange().getLowerRange(),
+ deserialized.getKeyRange().getLowerRange()));
+ assertTrue("KeyRange should match",
Bytes.equals(original.getKeyRange().getUpperRange(),
+ deserialized.getKeyRange().getUpperRange()));
+ }
+
+ @Test
+ public void testSerializationWithCoalescedSplit() throws IOException,
InterruptedException {
+ List<Scan> scans = new ArrayList<>();
+ scans.add(createScan(Bytes.toBytes("a"), Bytes.toBytes("d")));
+ scans.add(createScan(Bytes.toBytes("d"), Bytes.toBytes("g")));
+ scans.add(createScan(Bytes.toBytes("g"), Bytes.toBytes("j")));
+
+ // KeyRanges are now automatically derived from scans
+ PhoenixInputSplit original = new PhoenixInputSplit(scans, 3072,
"server1:16020");
+
+ PhoenixInputSplit deserialized = serializeAndDeserialize(original);
+
+ assertTrue("Should be coalesced after deserialization",
deserialized.isCoalesced());
+ assertEquals("Should have 3 keyRanges", 3,
deserialized.getKeyRanges().size());
+ assertEquals("Split size should match", original.getLength(),
deserialized.getLength());
+ assertArrayEquals("Region location should match", original.getLocations(),
+ deserialized.getLocations());
+
+ // Verify all keyRanges are preserved (derived from scans)
+ List<KeyRange> originalKeyRanges = original.getKeyRanges();
+ List<KeyRange> deserializedKeyRanges = deserialized.getKeyRanges();
+ for (int i = 0; i < originalKeyRanges.size(); i++) {
+ assertTrue("KeyRange " + i + " should match", Bytes.equals(
+ originalKeyRanges.get(i).getLowerRange(),
deserializedKeyRanges.get(i).getLowerRange()));
+ assertTrue("KeyRange " + i + " should match", Bytes.equals(
+ originalKeyRanges.get(i).getUpperRange(),
deserializedKeyRanges.get(i).getUpperRange()));
+ }
+ }
+
+ @Test
+ public void testGetLocations() throws IOException, InterruptedException {
+ List<Scan> scans =
+ Collections.singletonList(createScan(Bytes.toBytes("a"),
Bytes.toBytes("d")));
+
+ // Test with null regionLocation
+ PhoenixInputSplit split1 = new PhoenixInputSplit(scans, 1024, null);
+ assertArrayEquals("Should return empty array for null location", new
String[] {},
+ split1.getLocations());
+
+ // Test with valid regionLocation
+ PhoenixInputSplit split2 = new PhoenixInputSplit(scans, 1024,
"server1:16020");
+ assertArrayEquals("Should return array with server location", new String[]
{ "server1:16020" },
+ split2.getLocations());
+ }
+}
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixSyncTableInputFormatTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixSyncTableInputFormatTest.java
index 15e643feaf..95adb365a0 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixSyncTableInputFormatTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixSyncTableInputFormatTest.java
@@ -18,27 +18,46 @@
package org.apache.phoenix.mapreduce;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.KeyRange;
+import org.junit.Before;
import org.junit.Test;
/**
* Unit tests for PhoenixSyncTableInputFormat. Tests various scenarios of
filtering completed splits
+ * and split coalescing functionality.
*/
public class PhoenixSyncTableInputFormatTest {
private PhoenixSyncTableInputFormat inputFormat = new
PhoenixSyncTableInputFormat();
+ private ConnectionQueryServices mockQueryServices;
+ private byte[] physicalTableName = Bytes.toBytes("TEST_TABLE");
+
+ @Before
+ public void setup() throws Exception {
+ mockQueryServices = mock(ConnectionQueryServices.class);
+ }
+
/**
* Helper method to create a PhoenixInputSplit with given key range
boundaries.
*/
@@ -280,4 +299,289 @@ public class PhoenixSyncTableInputFormatTest {
assertTrue("Should return a PhoenixNoOpSingleRecordReader",
reader instanceof PhoenixNoOpSingleRecordReader);
}
+
+ @Test
+ public void testCoalesceSplitsWithSingleServer() throws Exception {
+ // Create 3 PhoenixInputSplits all on same server
+ List<InputSplit> splits = new ArrayList<>();
+ splits.add(createSplit(Bytes.toBytes("a"), Bytes.toBytes("d")));
+ splits.add(createSplit(Bytes.toBytes("d"), Bytes.toBytes("g")));
+ splits.add(createSplit(Bytes.toBytes("g"), Bytes.toBytes("j")));
+
+ // Create mock region location - all splits on server1
+ HRegionLocation mockRegion = createMockRegionLocation("server1:16020",
Bytes.toBytes("a"));
+
+ // Mock ConnectionQueryServices: all splits → server1
+ when(mockQueryServices.getTableRegionLocation(any(byte[].class),
any(byte[].class)))
+ .thenReturn(mockRegion);
+
+ // Call coalesceSplits()
+ List<InputSplit> result =
+ inputFormat.coalesceSplits(splits, mockQueryServices, physicalTableName);
+
+ // Verify: 1 coalesced split (all on same server)
+ assertEquals("Should have 1 coalesced split (all on same server)", 1,
result.size());
+
+ // Verify: Split is coalesced and contains 3 KeyRanges
+ PhoenixInputSplit coalescedSplit = (PhoenixInputSplit) result.get(0);
+
+ assertTrue("Split should be coalesced", coalescedSplit.isCoalesced());
+ assertEquals("Split should have 3 KeyRanges", 3,
coalescedSplit.getKeyRanges().size());
+
+ // Verify: KeyRanges are sorted
+ List<KeyRange> keyRanges = coalescedSplit.getKeyRanges();
+ assertTrue("First KeyRange should start with 'a'",
+ Bytes.equals(Bytes.toBytes("a"), keyRanges.get(0).getLowerRange()));
+ assertTrue("Second KeyRange should start with 'd'",
+ Bytes.equals(Bytes.toBytes("d"), keyRanges.get(1).getLowerRange()));
+ assertTrue("Third KeyRange should start with 'g'",
+ Bytes.equals(Bytes.toBytes("g"), keyRanges.get(2).getLowerRange()));
+ }
+
+ @Test
+ public void testCoalesceSplitsWithMultipleServers() throws Exception {
+ // Create 6 PhoenixInputSplits
+ List<InputSplit> splits = new ArrayList<>();
+ splits.add(createSplit(Bytes.toBytes("a"), Bytes.toBytes("c")));
+ splits.add(createSplit(Bytes.toBytes("c"), Bytes.toBytes("e")));
+ splits.add(createSplit(Bytes.toBytes("e"), Bytes.toBytes("g")));
+ splits.add(createSplit(Bytes.toBytes("g"), Bytes.toBytes("i")));
+ splits.add(createSplit(Bytes.toBytes("i"), Bytes.toBytes("k")));
+ splits.add(createSplit(Bytes.toBytes("k"), Bytes.toBytes("m")));
+
+ // Create mock region locations BEFORE stubbing to avoid nested stubbing
issues
+ HRegionLocation mockRegionA = createMockRegionLocation("server1:16020",
Bytes.toBytes("a"));
+ HRegionLocation mockRegionC = createMockRegionLocation("server1:16020",
Bytes.toBytes("c"));
+ HRegionLocation mockRegionE = createMockRegionLocation("server1:16020",
Bytes.toBytes("e"));
+ HRegionLocation mockRegionG = createMockRegionLocation("server2:16020",
Bytes.toBytes("g"));
+ HRegionLocation mockRegionI = createMockRegionLocation("server2:16020",
Bytes.toBytes("i"));
+ HRegionLocation mockRegionK = createMockRegionLocation("server2:16020",
Bytes.toBytes("k"));
+
+ // Mock ConnectionQueryServices: first 3 splits → server1, last 3 splits →
server2
+ when(mockQueryServices.getTableRegionLocation(physicalTableName,
Bytes.toBytes("a")))
+ .thenReturn(mockRegionA);
+ when(mockQueryServices.getTableRegionLocation(physicalTableName,
Bytes.toBytes("c")))
+ .thenReturn(mockRegionC);
+ when(mockQueryServices.getTableRegionLocation(physicalTableName,
Bytes.toBytes("e")))
+ .thenReturn(mockRegionE);
+ when(mockQueryServices.getTableRegionLocation(physicalTableName,
Bytes.toBytes("g")))
+ .thenReturn(mockRegionG);
+ when(mockQueryServices.getTableRegionLocation(physicalTableName,
Bytes.toBytes("i")))
+ .thenReturn(mockRegionI);
+ when(mockQueryServices.getTableRegionLocation(physicalTableName,
Bytes.toBytes("k")))
+ .thenReturn(mockRegionK);
+
+ // Call coalesceSplits()
+ List<InputSplit> result =
+ inputFormat.coalesceSplits(splits, mockQueryServices, physicalTableName);
+
+ // Verify: 2 coalesced splits (one per server)
+ assertEquals("Should have 2 coalesced splits (one per server)", 2,
result.size());
+
+ // Verify: Each split is coalesced and contains 3 KeyRanges
+ PhoenixInputSplit split1 = (PhoenixInputSplit) result.get(0);
+ PhoenixInputSplit split2 = (PhoenixInputSplit) result.get(1);
+
+ assertTrue("Split 1 should be coalesced", split1.isCoalesced());
+ assertTrue("Split 2 should be coalesced", split2.isCoalesced());
+
+ assertEquals("Split 1 should have 3 KeyRanges", 3,
split1.getKeyRanges().size());
+ assertEquals("Split 2 should have 3 KeyRanges", 3,
split2.getKeyRanges().size());
+
+ // Verify: Splits are sorted by start key within each server group
+ List<KeyRange> keyRanges1 = split1.getKeyRanges();
+ List<KeyRange> keyRanges2 = split2.getKeyRanges();
+
+ // Check that KeyRanges are sorted (each should be less than next)
+ for (int i = 0; i < keyRanges1.size() - 1; i++) {
+ assertTrue("KeyRanges in split 1 should be sorted",
+ Bytes.compareTo(keyRanges1.get(i).getLowerRange(), keyRanges1.get(i +
1).getLowerRange())
+ < 0);
+ }
+ for (int i = 0; i < keyRanges2.size() - 1; i++) {
+ assertTrue("KeyRanges in split 2 should be sorted",
+ Bytes.compareTo(keyRanges2.get(i).getLowerRange(), keyRanges2.get(i +
1).getLowerRange())
+ < 0);
+ }
+ }
+
+ @Test
+ public void testCoalesceSplitsWithEmptyList() throws Exception {
+ // Test edge case: empty input list
+ List<InputSplit> splits = new ArrayList<>();
+
+ List<InputSplit> result =
+ inputFormat.coalesceSplits(splits, mockQueryServices, physicalTableName);
+
+ assertEquals("Should return empty list for empty input", 0, result.size());
+ }
+
+ @Test
+ public void testCoalesceSplitsWithSingleSplit() throws Exception {
+ // Test edge case: single split (no coalescing needed)
+ List<InputSplit> splits = new ArrayList<>();
+ splits.add(createSplit(Bytes.toBytes("a"), Bytes.toBytes("d")));
+
+ HRegionLocation mockRegion = createMockRegionLocation("server1:16020",
Bytes.toBytes("a"));
+ when(mockQueryServices.getTableRegionLocation(any(byte[].class),
any(byte[].class)))
+ .thenReturn(mockRegion);
+
+ List<InputSplit> result =
+ inputFormat.coalesceSplits(splits, mockQueryServices, physicalTableName);
+
+ assertEquals("Should return 1 split", 1, result.size());
+ PhoenixInputSplit resultSplit = (PhoenixInputSplit) result.get(0);
+ assertFalse("Single split should not be marked as coalesced",
resultSplit.isCoalesced());
+ assertEquals("Should have 1 KeyRange", 1,
resultSplit.getKeyRanges().size());
+ }
+
+ @Test
+ public void
testCoalesceSplitsWithNullRegionLocationFallsBackToUnknownServer() throws
Exception {
+ // Null location (e.g. region in transition) — split should be placed in
UNKNOWN_SERVER bucket.
+ List<InputSplit> splits = new ArrayList<>();
+ splits.add(createSplit(Bytes.toBytes("a"), Bytes.toBytes("d")));
+
+ when(mockQueryServices.getTableRegionLocation(any(byte[].class),
any(byte[].class)))
+ .thenReturn(null);
+
+ List<InputSplit> result =
+ inputFormat.coalesceSplits(splits, mockQueryServices, physicalTableName);
+
+ assertEquals("Should return 1 split assigned to UNKNOWN_SERVER bucket", 1,
result.size());
+ PhoenixInputSplit resultSplit = (PhoenixInputSplit) result.get(0);
+ // The split is coalesced under UNKNOWN_SERVER — location reflects that
server name.
+ assertEquals("Split location should be UNKNOWN_SERVER",
+ PhoenixSyncTableInputFormat.UNKNOWN_SERVER,
resultSplit.getLocations()[0]);
+ }
+
+ @Test
+ public void testCoalesceSplitsWithNullServerNameFallsBackToUnknownServer()
throws Exception {
+ // Location present but serverName is null (e.g. region in transition) —
same fallback.
+ List<InputSplit> splits = new ArrayList<>();
+ splits.add(createSplit(Bytes.toBytes("a"), Bytes.toBytes("d")));
+
+ HRegionLocation nullServerLocation = mock(HRegionLocation.class);
+ when(nullServerLocation.getServerName()).thenReturn(null);
+ when(mockQueryServices.getTableRegionLocation(any(byte[].class),
any(byte[].class)))
+ .thenReturn(nullServerLocation);
+
+ List<InputSplit> result =
+ inputFormat.coalesceSplits(splits, mockQueryServices, physicalTableName);
+
+ assertEquals("Should return 1 split assigned to UNKNOWN_SERVER bucket", 1,
result.size());
+ PhoenixInputSplit resultSplit = (PhoenixInputSplit) result.get(0);
+ assertEquals("Split location should be UNKNOWN_SERVER",
+ PhoenixSyncTableInputFormat.UNKNOWN_SERVER,
resultSplit.getLocations()[0]);
+ }
+
+ @Test
+ public void
testCoalesceSplitsMultipleNullLocationsCoalescedIntoOneUnknownSplit()
+ throws Exception {
+ // Multiple splits with unavailable region location — all grouped into one
UNKNOWN_SERVER split.
+ List<InputSplit> splits = new ArrayList<>();
+ splits.add(createSplit(Bytes.toBytes("a"), Bytes.toBytes("d")));
+ splits.add(createSplit(Bytes.toBytes("d"), Bytes.toBytes("g")));
+ splits.add(createSplit(Bytes.toBytes("g"), Bytes.toBytes("j")));
+
+ when(mockQueryServices.getTableRegionLocation(any(byte[].class),
any(byte[].class)))
+ .thenReturn(null);
+
+ List<InputSplit> result =
+ inputFormat.coalesceSplits(splits, mockQueryServices, physicalTableName);
+
+ assertEquals("All RIT splits should be coalesced into one UNKNOWN_SERVER
split", 1,
+ result.size());
+ PhoenixInputSplit coalescedSplit = (PhoenixInputSplit) result.get(0);
+ assertTrue("UNKNOWN_SERVER split should be marked as coalesced",
coalescedSplit.isCoalesced());
+ assertEquals("Should contain all 3 KeyRanges", 3,
coalescedSplit.getKeyRanges().size());
+ }
+
+ @Test
+ public void testCoalesceSplitsMixedValidAndNullLocations() throws Exception {
+ // Scenario: 2 splits on server1, 2 splits with null location (RIT).
+ // Expected: 2 output splits — 1 coalesced on server1, 1 coalesced on
UNKNOWN_SERVER.
+ // This is the most realistic production scenario: a region split mid-job
affects only a
+ // subset of regions while the rest are healthy.
+ List<InputSplit> splits = new ArrayList<>();
+ splits.add(createSplit(Bytes.toBytes("a"), Bytes.toBytes("d")));
+ splits.add(createSplit(Bytes.toBytes("d"), Bytes.toBytes("g")));
+ splits.add(createSplit(Bytes.toBytes("g"), Bytes.toBytes("j")));
+ splits.add(createSplit(Bytes.toBytes("j"), Bytes.toBytes("m")));
+
+ HRegionLocation mockRegion = createMockRegionLocation("server1:16020",
Bytes.toBytes("a"));
+ when(mockQueryServices.getTableRegionLocation(physicalTableName,
Bytes.toBytes("a")))
+ .thenReturn(mockRegion);
+ when(mockQueryServices.getTableRegionLocation(physicalTableName,
Bytes.toBytes("d")))
+ .thenReturn(mockRegion);
+ when(mockQueryServices.getTableRegionLocation(physicalTableName,
Bytes.toBytes("g")))
+ .thenReturn(null);
+ when(mockQueryServices.getTableRegionLocation(physicalTableName,
Bytes.toBytes("j")))
+ .thenReturn(null);
+
+ List<InputSplit> result =
+ inputFormat.coalesceSplits(splits, mockQueryServices, physicalTableName);
+
+ assertEquals("Should produce 2 splits: server1 bucket + UNKNOWN_SERVER
bucket", 2,
+ result.size());
+
+ // Collect split locations
+ List<String> locations = new ArrayList<>();
+ for (InputSplit s : result) {
+ locations.add(((PhoenixInputSplit) s).getLocations()[0]);
+ }
+
+ assertTrue("Should have a server1 split",
locations.contains("server1:16020"));
+ assertTrue("Should have an UNKNOWN_SERVER split",
+ locations.contains(PhoenixSyncTableInputFormat.UNKNOWN_SERVER));
+
+ // Verify key range counts: server1 gets 2, UNKNOWN_SERVER gets 2
+ for (InputSplit s : result) {
+ PhoenixInputSplit ps = (PhoenixInputSplit) s;
+ String loc = ps.getLocations()[0];
+ if ("server1:16020".equals(loc)) {
+ assertEquals("server1 bucket should have 2 KeyRanges", 2,
ps.getKeyRanges().size());
+ } else {
+ assertEquals("UNKNOWN_SERVER bucket should have 2 KeyRanges", 2,
ps.getKeyRanges().size());
+ }
+ }
+ }
+
+ @Test
+ public void testCoalesceSplitsFailureThrowsForGenuineClusterError() throws
Exception {
+ // A real cluster error (SQLException) should still propagate — not be
swallowed.
+ List<InputSplit> splits = new ArrayList<>();
+ splits.add(createSplit(Bytes.toBytes("a"), Bytes.toBytes("d")));
+
+ SQLException simulatedFailure =
+ new SQLException("Simulated RegionServer communication failure");
+ when(mockQueryServices.getTableRegionLocation(any(byte[].class),
any(byte[].class)))
+ .thenThrow(simulatedFailure);
+
+ try {
+ inputFormat.coalesceSplits(splits, mockQueryServices, physicalTableName);
+ fail("Expected SQLException to be thrown for a genuine cluster error");
+ } catch (SQLException e) {
+ assertTrue("Exception message should be preserved",
+ e.getMessage().contains("Simulated RegionServer communication
failure"));
+ }
+ }
+
+ /**
+ * Helper method to create a mock HRegionLocation with the given server
address and start key.
+ */
+ private HRegionLocation createMockRegionLocation(String serverAddress,
byte[] startKey) {
+ HRegionLocation mockRegionLocation = mock(HRegionLocation.class);
+ ServerName mockServerName = mock(ServerName.class);
+ // Create a real Address object instead of mocking it, since toString() is
final
+ // Parse the serverAddress string to extract hostname and port
+ String[] parts = serverAddress.split(":");
+ String hostname = parts[0];
+ int port = parts.length > 1 ? Integer.parseInt(parts[1]) : 16020;
+ org.apache.hadoop.hbase.net.Address address =
+ org.apache.hadoop.hbase.net.Address.fromParts(hostname, port);
+
+ when(mockServerName.getAddress()).thenReturn(address);
+ when(mockRegionLocation.getServerName()).thenReturn(mockServerName);
+ return mockRegionLocation;
+ }
}
diff --git a/pom.xml b/pom.xml
index 1205d5f158..4e28795841 100644
--- a/pom.xml
+++ b/pom.xml
@@ -136,6 +136,7 @@
<junit.version>4.13.1</junit.version>
<hdrhistogram.version>2.1.12</hdrhistogram.version>
<byte-buddy.version>1.15.11</byte-buddy.version>
+ <bcprov-jdk18on.version>1.79</bcprov-jdk18on.version>
<!-- These are only used for exclusion when shading, and the exact version
is completely
irrelevant, but we need to keep them up to date to appease static
checkers. While Phoenix does
@@ -1361,6 +1362,11 @@
<artifactId>jcodings</artifactId>
<version>${jcodings.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.bouncycastle</groupId>
+ <artifactId>bcprov-jdk18on</artifactId>
+ <version>${bcprov-jdk18on.version}</version>
+ </dependency>
<dependency>
<groupId>org.hdrhistogram</groupId>
<artifactId>HdrHistogram</artifactId>