This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git

commit fc83f5e5a2436ec3837077021ec7ed0557015440
Author: tkhurana <[email protected]>
AuthorDate: Sun Jun 7 09:31:08 2026 -0700

    PHOENIX-7877 Addendum ReplicationLogWriter metrics: rename, units, 
decomposition support
    
    Make the replication metrics source produce a clean, decomposable
    sync-time signal for production analysis.
    
    Naming and units:
    - Rename JMX metric strings with phoenixWAL prefix to avoid collision
      with HBase MetricsWALSource (appendTime/syncTime) on flattened
      dashboards.
    - Encode the stored unit in the metric-name suffix (Ms or Ns) so JMX
      consumers cannot misinterpret values.
    - Convert ms-unit metrics (sync, fsSync, rotation) from ns input to ms
      storage at the impl boundary so HBase WAL comparisons are unit-aligned.
    - Keep ns precision for sub-millisecond metrics (append, ringBuffer,
      pendingSyncWait) where ms truncation would lose all signal.
    
    Histogram types:
    - Switch ms-unit time histograms to MutableTimeHistogram so JMX exports
      TimeRangeCount_* bucket counts (correct distribution merging is then
      possible).
    - Switch batchSize and pendingSyncCount to MutableSizeHistogram for the
      same reason on count-shaped metrics.
    - Keep ns-unit histograms as plain MutableHistogram (range buckets are
      ms-keyed and don't apply to ns values).
    
    Decomposition support:
    - Gate updateRingBufferTime to SYNC events only. ringBufferTime now
      measures producer-side queue + drain-ahead specifically, which is
      what the syncTime decomposition equation needs.
    - Add p50/p99 fields to ReplicationLogMetricValues for the four time
      components used in the decomposition (sync, fsSync, ringBuffer,
      pendingSyncWait). The DTO is consumed only from tests; populated via
      HistogramImpl.snapshot() reached through reflection.
    - Rename DTO getters/builder methods with Max suffix where they expose
      histogram max, leaving counter and percentile getters as-is.
    
    ReplicationSyncTime placement:
    - Move the timing block in IndexRegionObserver from the call site into
      replicateMutations, after the early-return guards. The metric is now
      recorded only when replication actually does work (replication
      enabled, HA group present, not in test-skip mode), so percentiles
      reflect real replication latency rather than being diluted by
      zero-cost no-op samples.
    
    Test:
    - Add MetricsReplicationLogGroupSourceImplTest covering verbatim ns
      recording, ns->ms conversion, sub-ms truncation, max-across-samples,
      counters, and size histograms.
    - Update simulator and IT callers to use the renamed getters and to
      compare the decomposition equation in nanoseconds (with a 1ms
      tolerance for fsSync ms-floor truncation).
    - Gate testReplicationSyncPathSimulator behind -Dtest.runSimulator=true
      so it doesn't run in default CI.
---
 .../phoenix/hbase/index/IndexRegionObserver.java   |  71 ++++----
 .../phoenix/replication/ReplicationLogGroup.java   |   7 +-
 .../metrics/MetricsReplicationLogGroupSource.java  |  46 ++---
 .../MetricsReplicationLogGroupSourceImpl.java      |  93 +++++++---
 .../metrics/ReplicationLogMetricValues.java        | 195 ++++++++++++++++-----
 .../phoenix/replication/ReplicationLogGroupIT.java |  26 +--
 .../replication/ReplicationLogGroupTest.java       | 152 ++++++++++++++++
 .../MetricsReplicationLogGroupSourceImplTest.java  | 105 +++++++++++
 8 files changed, 550 insertions(+), 145 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 2547719695..c7825ce96c 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -2319,13 +2319,7 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
                      // updates
         CompletableFuture<Void> postIndexFuture =
           CompletableFuture.runAsync(() -> doPost(c, context));
-        long start = EnvironmentEdgeManager.currentTimeMillis();
-        try {
-          replicateMutations(c.getEnvironment(), miniBatchOp, context);
-        } finally {
-          long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
-          metricSource.updateReplicationSyncTime(dataTableName, duration);
-        }
+        replicateMutations(c.getEnvironment(), miniBatchOp, context);
         FutureUtils.get(postIndexFuture);
       }
     } finally {
@@ -2942,40 +2936,47 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
     }
     ReplicationLogGroup group = logGroup.get();
 
-    for (int i = 0; i < miniBatchOp.size(); i++) {
-      Mutation m = miniBatchOp.getOperation(i);
-      if (this.ignoreReplicationFilter.test(m)) {
-        continue;
-      }
-      // When coprocessors add cells (local index, conditional TTL, ON 
DUPLICATE KEY UPDATE),
-      // HBase merges them into the data mutation which can mix row keys and 
cell types.
-      // Split those back into individual Put/Delete mutations for correct 
serialization.
-      if (miniBatchOp.getOperationsFromCoprocessors(i) == null) {
-        group.append(this.dataTableName, -1, m);
-      } else {
-        for (Mutation split : splitCellsIntoMutations(m)) {
-          group.append(this.dataTableName, -1, split);
+    // Record ReplicationSyncTime only when we are actually doing work (not on 
early-return paths).
+    long start = EnvironmentEdgeManager.currentTimeMillis();
+    try {
+      for (int i = 0; i < miniBatchOp.size(); i++) {
+        Mutation m = miniBatchOp.getOperation(i);
+        if (this.ignoreReplicationFilter.test(m)) {
+          continue;
+        }
+        // When coprocessors add cells (local index, conditional TTL, ON 
DUPLICATE KEY UPDATE),
+        // HBase merges them into the data mutation which can mix row keys and 
cell types.
+        // Split those back into individual Put/Delete mutations for correct 
serialization.
+        if (miniBatchOp.getOperationsFromCoprocessors(i) == null) {
+          group.append(this.dataTableName, -1, m);
+        } else {
+          for (Mutation split : splitCellsIntoMutations(m)) {
+            group.append(this.dataTableName, -1, split);
+          }
         }
       }
-    }
-    if (context.preIndexUpdates != null) {
-      for (Map.Entry<HTableInterfaceReference, Mutation> entry : 
context.preIndexUpdates
-        .entries()) {
-        if (this.ignoreReplicationFilter.test(entry.getValue())) {
-          continue;
+      if (context.preIndexUpdates != null) {
+        for (Map.Entry<HTableInterfaceReference, Mutation> entry : 
context.preIndexUpdates
+          .entries()) {
+          if (this.ignoreReplicationFilter.test(entry.getValue())) {
+            continue;
+          }
+          group.append(entry.getKey().getTableName(), -1, entry.getValue());
         }
-        group.append(entry.getKey().getTableName(), -1, entry.getValue());
       }
-    }
-    if (context.postIndexUpdates != null) {
-      for (Map.Entry<HTableInterfaceReference, Mutation> entry : 
context.postIndexUpdates
-        .entries()) {
-        if (this.ignoreReplicationFilter.test(entry.getValue())) {
-          continue;
+      if (context.postIndexUpdates != null) {
+        for (Map.Entry<HTableInterfaceReference, Mutation> entry : 
context.postIndexUpdates
+          .entries()) {
+          if (this.ignoreReplicationFilter.test(entry.getValue())) {
+            continue;
+          }
+          group.append(entry.getKey().getTableName(), -1, entry.getValue());
         }
-        group.append(entry.getKey().getTableName(), -1, entry.getValue());
       }
+      group.sync();
+    } finally {
+      long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
+      metricSource.updateReplicationSyncTime(this.dataTableName, duration);
     }
-    group.sync();
   }
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
index 0d7a9f42e4..0a862d640a 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
@@ -1169,8 +1169,11 @@ public class ReplicationLogGroup {
     @Override
     public void onEvent(LogEvent event, long sequence, boolean endOfBatch) 
throws Exception {
       long currentTimeNs = System.nanoTime();
-      long ringBufferTimeNs = currentTimeNs - event.timestampNs;
-      metrics.updateRingBufferTime(ringBufferTimeNs);
+      // Record ring-buffer wait only for SYNC events (queue + drain ahead). 
Producers are blocked
+      // on sync; data-event waits are not directly observable to a caller.
+      if (event.type == EVENT_TYPE_SYNC) {
+        metrics.updateRingBufferTime(currentTimeNs - event.timestampNs);
+      }
       batchEventCount++;
       if (fatalException != null) {
         // Append events are ignored; sync futures are failed immediately
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSource.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSource.java
index 6c0392f2b7..13dd680998 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSource.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSource.java
@@ -27,41 +27,41 @@ public interface MetricsReplicationLogGroupSource extends 
BaseSource {
   String METRICS_DESCRIPTION = "Metrics about Replication Log Operations for 
an HA Group";
   String METRICS_JMX_CONTEXT = "RegionServer,sub=" + METRICS_NAME;
 
-  String ROTATION_COUNT = "rotationCount";
+  String ROTATION_COUNT = "phoenixWALRotationCount";
   String ROTATION_COUNT_DESC = "Total number of times rotateLog was called";
 
-  String ROTATION_FAILURES = "rotationFailures";
-
+  String ROTATION_FAILURES = "phoenixWALRotationFailures";
   String ROTATION_FAILURES_DESC = "Number of times log rotation has failed";
 
-  // All time histograms in this source are nanoseconds.
-  String APPEND_TIME = "appendTime";
+  // Time histograms encode the unit in the name suffix (Ms or Ns) so 
consumers cannot misinterpret.
+  String APPEND_TIME = "phoenixWALAppendTimeNs";
   String APPEND_TIME_DESC = "Histogram of time taken for append operations in 
nanoseconds";
 
-  String SYNC_TIME = "syncTime";
-  String SYNC_TIME_DESC = "Histogram of time taken for sync operations in 
nanoseconds";
+  String SYNC_TIME = "phoenixWALSyncTimeMs";
+  String SYNC_TIME_DESC = "Histogram of time taken for sync operations in 
milliseconds";
 
-  String ROTATION_TIME = "rotationTime";
-  String ROTATION_TIME_DESC = "Histogram of time taken for log rotations in 
nanoseconds";
+  String ROTATION_TIME = "phoenixWALRotationTimeMs";
+  String ROTATION_TIME_DESC = "Histogram of time taken for log rotations in 
milliseconds";
 
-  String RING_BUFFER_TIME = "ringBufferTime";
-  String RING_BUFFER_TIME_DESC = "Time events spend in the ring buffer in 
nanoseconds";
+  String RING_BUFFER_TIME = "phoenixWALSyncRingBufferTimeNs";
+  String RING_BUFFER_TIME_DESC =
+    "Time SYNC events spend in the ring buffer (queue + drain ahead) in 
nanoseconds";
 
-  String FS_SYNC_TIME = "fsSyncTime";
+  String FS_SYNC_TIME = "phoenixWALFsSyncTimeMs";
   String FS_SYNC_TIME_DESC =
-    "Histogram of time taken for the underlying filesystem sync (fsync) in 
nanoseconds";
+    "Histogram of time taken for the underlying filesystem sync (fsync) in 
milliseconds";
 
-  String BATCH_SIZE = "batchSize";
+  String BATCH_SIZE = "phoenixWALBatchSize";
   String BATCH_SIZE_DESC = "Histogram of number of events drained per 
Disruptor batch";
 
-  String PENDING_SYNC_COUNT = "pendingSyncCount";
+  String PENDING_SYNC_COUNT = "phoenixWALPendingSyncCount";
   String PENDING_SYNC_COUNT_DESC = "Histogram of pending sync futures 
coalesced into one fsync";
 
-  String PENDING_SYNC_WAIT_TIME = "pendingSyncWaitTime";
+  String PENDING_SYNC_WAIT_TIME = "phoenixWALPendingSyncWaitTimeNs";
   String PENDING_SYNC_WAIT_TIME_DESC =
     "Time a SYNC event waits between consumer pickup and fsync start, in 
nanoseconds";
 
-  String SYNC_TO_SAF_TRANSITIONS = "syncToSafTransitions";
+  String SYNC_TO_SAF_TRANSITIONS = "SyncToSafTransitions";
   String SYNC_TO_SAF_TRANSITIONS_DESC = "Number of SYNC to STORE_AND_FORWARD 
mode transitions";
 
   /**
@@ -71,31 +71,33 @@ public interface MetricsReplicationLogGroupSource extends 
BaseSource {
   void incrementRotationCount();
 
   /**
-   * Update the time taken for an append operation in nanoseconds.
+   * Update the time taken for an append operation. Recorded into histogram in 
nanoseconds.
    * @param timeNs Time taken in nanoseconds
    */
   void updateAppendTime(long timeNs);
 
   /**
-   * Update the time taken for a sync operation in nanoseconds.
+   * Update the time taken for a sync operation. Recorded into histogram in 
milliseconds.
    * @param timeNs Time taken in nanoseconds
    */
   void updateSyncTime(long timeNs);
 
   /**
-   * Update the time taken for a rotation operation in nanoseconds.
+   * Update the time taken for a rotation operation. Recorded into histogram 
in milliseconds.
    * @param timeNs Time taken in nanoseconds
    */
   void updateRotationTime(long timeNs);
 
   /**
-   * Update the time an event spent in the ring buffer in nanoseconds.
+   * Update the time a SYNC event spent in the ring buffer (queue + drain 
ahead). Recorded into
+   * histogram in nanoseconds.
    * @param timeNs Time spent in nanoseconds
    */
   void updateRingBufferTime(long timeNs);
 
   /**
-   * Update the time taken for the underlying filesystem sync (fsync) in 
nanoseconds.
+   * Update the time taken for the underlying filesystem sync (fsync). 
Recorded into histogram in
+   * milliseconds.
    * @param timeNs Time taken in nanoseconds
    */
   void updateFsSyncTime(long timeNs);
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImpl.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImpl.java
index 7fe679075d..4b0ab4fd84 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImpl.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImpl.java
@@ -17,10 +17,16 @@
  */
 package org.apache.phoenix.replication.metrics;
 
+import java.lang.reflect.Field;
+import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
+import org.apache.hadoop.hbase.metrics.Snapshot;
+import org.apache.hadoop.hbase.metrics.impl.HistogramImpl;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.lib.MutableFastCounter;
 import org.apache.hadoop.metrics2.lib.MutableHistogram;
+import org.apache.hadoop.metrics2.lib.MutableSizeHistogram;
+import org.apache.hadoop.metrics2.lib.MutableTimeHistogram;
 
 /** Implementation of metrics source for ReplicationLog operations. */
 public class MetricsReplicationLogGroupSourceImpl extends BaseSourceImpl
@@ -29,14 +35,14 @@ public class MetricsReplicationLogGroupSourceImpl extends 
BaseSourceImpl
   private final MutableFastCounter rotationCount;
   private final MutableFastCounter rotationFailuresCount;
   private final MutableFastCounter syncToSafTransitions;
-  private final MutableHistogram appendTime;
-  private final MutableHistogram syncTime;
-  private final MutableHistogram rotationTime;
-  private final MutableHistogram ringBufferTime;
-  private final MutableHistogram fsSyncTime;
-  private final MutableHistogram batchSize;
-  private final MutableHistogram pendingSyncCount;
-  private final MutableHistogram pendingSyncWaitTime;
+  private final MutableHistogram appendTimeNs;
+  private final MutableTimeHistogram syncTimeMs;
+  private final MutableTimeHistogram rotationTimeMs;
+  private final MutableHistogram ringBufferTimeNs;
+  private final MutableTimeHistogram fsSyncTimeMs;
+  private final MutableSizeHistogram batchSize;
+  private final MutableSizeHistogram pendingSyncCount;
+  private final MutableHistogram pendingSyncWaitTimeNs;
 
   public MetricsReplicationLogGroupSourceImpl(String haGroupName) {
     this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, 
METRICS_JMX_CONTEXT, haGroupName);
@@ -51,15 +57,15 @@ public class MetricsReplicationLogGroupSourceImpl extends 
BaseSourceImpl
       getMetricsRegistry().newCounter(ROTATION_FAILURES, 
ROTATION_FAILURES_DESC, 0L);
     syncToSafTransitions =
       getMetricsRegistry().newCounter(SYNC_TO_SAF_TRANSITIONS, 
SYNC_TO_SAF_TRANSITIONS_DESC, 0L);
-    appendTime = getMetricsRegistry().newHistogram(APPEND_TIME, 
APPEND_TIME_DESC);
-    syncTime = getMetricsRegistry().newHistogram(SYNC_TIME, SYNC_TIME_DESC);
-    rotationTime = getMetricsRegistry().newHistogram(ROTATION_TIME, 
ROTATION_TIME_DESC);
-    ringBufferTime = getMetricsRegistry().newHistogram(RING_BUFFER_TIME, 
RING_BUFFER_TIME_DESC);
-    fsSyncTime = getMetricsRegistry().newHistogram(FS_SYNC_TIME, 
FS_SYNC_TIME_DESC);
-    batchSize = getMetricsRegistry().newHistogram(BATCH_SIZE, BATCH_SIZE_DESC);
+    appendTimeNs = getMetricsRegistry().newHistogram(APPEND_TIME, 
APPEND_TIME_DESC);
+    syncTimeMs = getMetricsRegistry().newTimeHistogram(SYNC_TIME, 
SYNC_TIME_DESC);
+    rotationTimeMs = getMetricsRegistry().newTimeHistogram(ROTATION_TIME, 
ROTATION_TIME_DESC);
+    ringBufferTimeNs = getMetricsRegistry().newHistogram(RING_BUFFER_TIME, 
RING_BUFFER_TIME_DESC);
+    fsSyncTimeMs = getMetricsRegistry().newTimeHistogram(FS_SYNC_TIME, 
FS_SYNC_TIME_DESC);
+    batchSize = getMetricsRegistry().newSizeHistogram(BATCH_SIZE, 
BATCH_SIZE_DESC);
     pendingSyncCount =
-      getMetricsRegistry().newHistogram(PENDING_SYNC_COUNT, 
PENDING_SYNC_COUNT_DESC);
-    pendingSyncWaitTime =
+      getMetricsRegistry().newSizeHistogram(PENDING_SYNC_COUNT, 
PENDING_SYNC_COUNT_DESC);
+    pendingSyncWaitTimeNs =
       getMetricsRegistry().newHistogram(PENDING_SYNC_WAIT_TIME, 
PENDING_SYNC_WAIT_TIME_DESC);
   }
 
@@ -85,27 +91,27 @@ public class MetricsReplicationLogGroupSourceImpl extends 
BaseSourceImpl
 
   @Override
   public void updateAppendTime(long timeNs) {
-    appendTime.add(timeNs);
+    appendTimeNs.add(timeNs);
   }
 
   @Override
   public void updateSyncTime(long timeNs) {
-    syncTime.add(timeNs);
+    syncTimeMs.add(TimeUnit.NANOSECONDS.toMillis(timeNs));
   }
 
   @Override
   public void updateRotationTime(long timeNs) {
-    rotationTime.add(timeNs);
+    rotationTimeMs.add(TimeUnit.NANOSECONDS.toMillis(timeNs));
   }
 
   @Override
   public void updateRingBufferTime(long timeNs) {
-    ringBufferTime.add(timeNs);
+    ringBufferTimeNs.add(timeNs);
   }
 
   @Override
   public void updateFsSyncTime(long timeNs) {
-    fsSyncTime.add(timeNs);
+    fsSyncTimeMs.add(TimeUnit.NANOSECONDS.toMillis(timeNs));
   }
 
   @Override
@@ -120,18 +126,51 @@ public class MetricsReplicationLogGroupSourceImpl extends 
BaseSourceImpl
 
   @Override
   public void updatePendingSyncWaitTime(long timeNs) {
-    pendingSyncWaitTime.add(timeNs);
+    pendingSyncWaitTimeNs.add(timeNs);
   }
 
+  /**
+   * Test-facing accessor that snapshots the four time histograms used in the 
producer-side sync
+   * decomposition (sync, ringBuffer, fsSync, pendingSyncWait) and returns max 
+ p50 + p99 for each.
+   * <p>
+   * <b>Side effect:</b> the snapshot resets the underlying FastLongHistogram 
bins, so this method
+   * is destructive and intended for end-of-test inspection only. Subsequent 
histogram reads will
+   * see only data added after this call.
+   */
   @Override
   public ReplicationLogMetricValues getCurrentMetricValues() {
+    Snapshot syncSnap = snapshot(syncTimeMs);
+    Snapshot ringSnap = snapshot(ringBufferTimeNs);
+    Snapshot fsSnap = snapshot(fsSyncTimeMs);
+    Snapshot pendSnap = snapshot(pendingSyncWaitTimeNs);
     return 
ReplicationLogMetricValues.builder().rotationCount(rotationCount.value())
       .rotationFailuresCount(rotationFailuresCount.value())
-      
.syncToSafTransitions(syncToSafTransitions.value()).appendTime(appendTime.getMax())
-      .syncTime(syncTime.getMax()).rotationTime(rotationTime.getMax())
-      .ringBufferTime(ringBufferTime.getMax()).fsSyncTime(fsSyncTime.getMax())
-      
.batchSize(batchSize.getMax()).pendingSyncCount(pendingSyncCount.getMax())
-      .pendingSyncWaitTime(pendingSyncWaitTime.getMax()).build();
+      
.syncToSafTransitions(syncToSafTransitions.value()).appendTimeMax(appendTimeNs.getMax())
+      .syncTimeMax(syncSnap.getMax()).syncTimeP50(syncSnap.getMedian())
+      
.syncTimeP99(syncSnap.get99thPercentile()).rotationTimeMax(rotationTimeMs.getMax())
+      
.ringBufferTimeMax(ringSnap.getMax()).ringBufferTimeP50(ringSnap.getMedian())
+      
.ringBufferTimeP99(ringSnap.get99thPercentile()).fsSyncTimeMax(fsSnap.getMax())
+      
.fsSyncTimeP50(fsSnap.getMedian()).fsSyncTimeP99(fsSnap.get99thPercentile())
+      
.batchSizeMax(batchSize.getMax()).pendingSyncCountMax(pendingSyncCount.getMax())
+      
.pendingSyncWaitTimeMax(pendSnap.getMax()).pendingSyncWaitTimeP50(pendSnap.getMedian())
+      .pendingSyncWaitTimeP99(pendSnap.get99thPercentile()).build();
+  }
+
+  /**
+   * Reach into MutableHistogram via reflection to call 
HistogramImpl.snapshot(), which exposes the
+   * full percentile distribution. The protected {@code 
MutableHistogram.histogram} field is the
+   * only path to these percentiles short of subclassing every histogram type.
+   */
+  private static Snapshot snapshot(MutableHistogram histogram) {
+    try {
+      Field field = MutableHistogram.class.getDeclaredField("histogram");
+      field.setAccessible(true);
+      HistogramImpl impl = (HistogramImpl) field.get(histogram);
+      return impl.snapshot();
+    } catch (ReflectiveOperationException e) {
+      throw new IllegalStateException("Failed to access 
MutableHistogram.histogram via reflection",
+        e);
+    }
   }
 
   @Override
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/ReplicationLogMetricValues.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/ReplicationLogMetricValues.java
index 3fa79d1d02..c177743d6b 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/ReplicationLogMetricValues.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/ReplicationLogMetricValues.java
@@ -17,33 +17,54 @@
  */
 package org.apache.phoenix.replication.metrics;
 
-/** Class to hold the values of all metrics tracked by the ReplicationLog 
metrics source. */
+/**
+ * Class to hold the values of all metrics tracked by the ReplicationLog 
metrics source. The
+ * time-named getters (e.g. {@link #getSyncTime()}) return the max observed 
value. Percentile
+ * accessors (e.g. {@link #getSyncTimeP50()}, {@link #getSyncTimeP99()}) are 
populated when the
+ * source produces this DTO via a destructive snapshot path; otherwise they 
are zero.
+ */
 public class ReplicationLogMetricValues {
 
   private final long rotationCount;
   private final long rotationFailuresCount;
   private final long syncToSafTransitions;
-  private final long appendTime;
-  private final long syncTime;
-  private final long rotationTime;
-  private final long ringBufferTime;
-  private final long fsSyncTime;
+  private final long appendTimeNs;
+  private final long syncTimeMs;
+  private final long syncTimeP50Ms;
+  private final long syncTimeP99Ms;
+  private final long rotationTimeMs;
+  private final long ringBufferTimeNs;
+  private final long ringBufferTimeP50Ns;
+  private final long ringBufferTimeP99Ns;
+  private final long fsSyncTimeMs;
+  private final long fsSyncTimeP50Ms;
+  private final long fsSyncTimeP99Ms;
   private final long batchSize;
   private final long pendingSyncCount;
-  private final long pendingSyncWaitTime;
+  private final long pendingSyncWaitTimeNs;
+  private final long pendingSyncWaitTimeP50Ns;
+  private final long pendingSyncWaitTimeP99Ns;
 
   private ReplicationLogMetricValues(Builder b) {
     this.rotationCount = b.rotationCount;
     this.rotationFailuresCount = b.rotationFailuresCount;
     this.syncToSafTransitions = b.syncToSafTransitions;
-    this.appendTime = b.appendTime;
-    this.syncTime = b.syncTime;
-    this.rotationTime = b.rotationTime;
-    this.ringBufferTime = b.ringBufferTime;
-    this.fsSyncTime = b.fsSyncTime;
+    this.appendTimeNs = b.appendTimeNs;
+    this.syncTimeMs = b.syncTimeMs;
+    this.syncTimeP50Ms = b.syncTimeP50Ms;
+    this.syncTimeP99Ms = b.syncTimeP99Ms;
+    this.rotationTimeMs = b.rotationTimeMs;
+    this.ringBufferTimeNs = b.ringBufferTimeNs;
+    this.ringBufferTimeP50Ns = b.ringBufferTimeP50Ns;
+    this.ringBufferTimeP99Ns = b.ringBufferTimeP99Ns;
+    this.fsSyncTimeMs = b.fsSyncTimeMs;
+    this.fsSyncTimeP50Ms = b.fsSyncTimeP50Ms;
+    this.fsSyncTimeP99Ms = b.fsSyncTimeP99Ms;
     this.batchSize = b.batchSize;
     this.pendingSyncCount = b.pendingSyncCount;
-    this.pendingSyncWaitTime = b.pendingSyncWaitTime;
+    this.pendingSyncWaitTimeNs = b.pendingSyncWaitTimeNs;
+    this.pendingSyncWaitTimeP50Ns = b.pendingSyncWaitTimeP50Ns;
+    this.pendingSyncWaitTimeP99Ns = b.pendingSyncWaitTimeP99Ns;
   }
 
   public static Builder builder() {
@@ -62,50 +83,90 @@ public class ReplicationLogMetricValues {
     return syncToSafTransitions;
   }
 
-  public long getAppendTime() {
-    return appendTime;
+  public long getAppendTimeMax() {
+    return appendTimeNs;
+  }
+
+  public long getSyncTimeMax() {
+    return syncTimeMs;
+  }
+
+  public long getSyncTimeP50() {
+    return syncTimeP50Ms;
+  }
+
+  public long getSyncTimeP99() {
+    return syncTimeP99Ms;
+  }
+
+  public long getRotationTimeMax() {
+    return rotationTimeMs;
+  }
+
+  public long getRingBufferTimeMax() {
+    return ringBufferTimeNs;
   }
 
-  public long getSyncTime() {
-    return syncTime;
+  public long getRingBufferTimeP50() {
+    return ringBufferTimeP50Ns;
   }
 
-  public long getRotationTime() {
-    return rotationTime;
+  public long getRingBufferTimeP99() {
+    return ringBufferTimeP99Ns;
   }
 
-  public long getRingBufferTime() {
-    return ringBufferTime;
+  public long getFsSyncTimeMax() {
+    return fsSyncTimeMs;
   }
 
-  public long getFsSyncTime() {
-    return fsSyncTime;
+  public long getFsSyncTimeP50() {
+    return fsSyncTimeP50Ms;
   }
 
-  public long getBatchSize() {
+  public long getFsSyncTimeP99() {
+    return fsSyncTimeP99Ms;
+  }
+
+  public long getBatchSizeMax() {
     return batchSize;
   }
 
-  public long getPendingSyncCount() {
+  public long getPendingSyncCountMax() {
     return pendingSyncCount;
   }
 
-  public long getPendingSyncWaitTime() {
-    return pendingSyncWaitTime;
+  public long getPendingSyncWaitTimeMax() {
+    return pendingSyncWaitTimeNs;
+  }
+
+  public long getPendingSyncWaitTimeP50() {
+    return pendingSyncWaitTimeP50Ns;
+  }
+
+  public long getPendingSyncWaitTimeP99() {
+    return pendingSyncWaitTimeP99Ns;
   }
 
   public static class Builder {
     private long rotationCount;
     private long rotationFailuresCount;
     private long syncToSafTransitions;
-    private long appendTime;
-    private long syncTime;
-    private long rotationTime;
-    private long ringBufferTime;
-    private long fsSyncTime;
+    private long appendTimeNs;
+    private long syncTimeMs;
+    private long syncTimeP50Ms;
+    private long syncTimeP99Ms;
+    private long rotationTimeMs;
+    private long ringBufferTimeNs;
+    private long ringBufferTimeP50Ns;
+    private long ringBufferTimeP99Ns;
+    private long fsSyncTimeMs;
+    private long fsSyncTimeP50Ms;
+    private long fsSyncTimeP99Ms;
     private long batchSize;
     private long pendingSyncCount;
-    private long pendingSyncWaitTime;
+    private long pendingSyncWaitTimeNs;
+    private long pendingSyncWaitTimeP50Ns;
+    private long pendingSyncWaitTimeP99Ns;
 
     public Builder rotationCount(long v) {
       this.rotationCount = v;
@@ -122,43 +183,83 @@ public class ReplicationLogMetricValues {
       return this;
     }
 
-    public Builder appendTime(long v) {
-      this.appendTime = v;
+    public Builder appendTimeMax(long v) {
+      this.appendTimeNs = v;
       return this;
     }
 
-    public Builder syncTime(long v) {
-      this.syncTime = v;
+    public Builder syncTimeMax(long v) {
+      this.syncTimeMs = v;
       return this;
     }
 
-    public Builder rotationTime(long v) {
-      this.rotationTime = v;
+    public Builder syncTimeP50(long v) {
+      this.syncTimeP50Ms = v;
       return this;
     }
 
-    public Builder ringBufferTime(long v) {
-      this.ringBufferTime = v;
+    public Builder syncTimeP99(long v) {
+      this.syncTimeP99Ms = v;
       return this;
     }
 
-    public Builder fsSyncTime(long v) {
-      this.fsSyncTime = v;
+    public Builder rotationTimeMax(long v) {
+      this.rotationTimeMs = v;
       return this;
     }
 
-    public Builder batchSize(long v) {
+    public Builder ringBufferTimeMax(long v) {
+      this.ringBufferTimeNs = v;
+      return this;
+    }
+
+    public Builder ringBufferTimeP50(long v) {
+      this.ringBufferTimeP50Ns = v;
+      return this;
+    }
+
+    public Builder ringBufferTimeP99(long v) {
+      this.ringBufferTimeP99Ns = v;
+      return this;
+    }
+
+    public Builder fsSyncTimeMax(long v) {
+      this.fsSyncTimeMs = v;
+      return this;
+    }
+
+    public Builder fsSyncTimeP50(long v) {
+      this.fsSyncTimeP50Ms = v;
+      return this;
+    }
+
+    public Builder fsSyncTimeP99(long v) {
+      this.fsSyncTimeP99Ms = v;
+      return this;
+    }
+
+    public Builder batchSizeMax(long v) {
       this.batchSize = v;
       return this;
     }
 
-    public Builder pendingSyncCount(long v) {
+    public Builder pendingSyncCountMax(long v) {
       this.pendingSyncCount = v;
       return this;
     }
 
-    public Builder pendingSyncWaitTime(long v) {
-      this.pendingSyncWaitTime = v;
+    public Builder pendingSyncWaitTimeMax(long v) {
+      this.pendingSyncWaitTimeNs = v;
+      return this;
+    }
+
+    public Builder pendingSyncWaitTimeP50(long v) {
+      this.pendingSyncWaitTimeP50Ns = v;
+      return this;
+    }
+
+    public Builder pendingSyncWaitTimeP99(long v) {
+      this.pendingSyncWaitTimeP99Ns = v;
       return this;
     }
 
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
index d70dabeb47..776fbcb509 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
@@ -181,18 +181,20 @@ public class ReplicationLogGroupIT extends HABaseIT {
 
   private void assertMetricsEmitted() {
     ReplicationLogMetricValues values = 
logGroup.getMetrics().getCurrentMetricValues();
-    assertTrue("appendTime should be > 0, got " + values.getAppendTime(),
-      values.getAppendTime() > 0);
-    assertTrue("syncTime should be > 0, got " + values.getSyncTime(), 
values.getSyncTime() > 0);
-    assertTrue("ringBufferTime should be > 0, got " + 
values.getRingBufferTime(),
-      values.getRingBufferTime() > 0);
-    assertTrue("fsSyncTime should be > 0, got " + values.getFsSyncTime(),
-      values.getFsSyncTime() > 0);
-    assertTrue("batchSize should be > 0, got " + values.getBatchSize(), 
values.getBatchSize() > 0);
-    assertTrue("pendingSyncCount should be > 0, got " + 
values.getPendingSyncCount(),
-      values.getPendingSyncCount() > 0);
-    assertTrue("pendingSyncWaitTime should be > 0, got " + 
values.getPendingSyncWaitTime(),
-      values.getPendingSyncWaitTime() > 0);
+    assertTrue("appendTime should be > 0, got " + values.getAppendTimeMax(),
+      values.getAppendTimeMax() > 0);
+    assertTrue("syncTime should be > 0, got " + values.getSyncTimeMax(),
+      values.getSyncTimeMax() > 0);
+    assertTrue("ringBufferTime should be > 0, got " + 
values.getRingBufferTimeMax(),
+      values.getRingBufferTimeMax() > 0);
+    assertTrue("fsSyncTime should be > 0, got " + values.getFsSyncTimeMax(),
+      values.getFsSyncTimeMax() > 0);
+    assertTrue("batchSize should be > 0, got " + values.getBatchSizeMax(),
+      values.getBatchSizeMax() > 0);
+    assertTrue("pendingSyncCount should be > 0, got " + 
values.getPendingSyncCountMax(),
+      values.getPendingSyncCountMax() > 0);
+    assertTrue("pendingSyncWaitTime should be > 0, got " + 
values.getPendingSyncWaitTimeMax(),
+      values.getPendingSyncWaitTimeMax() > 0);
   }
 
   private void dumpTableLogCount(Map<String, List<Mutation>> mutationsByTable) 
{
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
index 349fdb8620..2f81671835 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/ReplicationLogGroupTest.java
@@ -45,8 +45,12 @@ import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.phoenix.jdbc.HAGroupStoreRecord;
@@ -58,6 +62,8 @@ import 
org.apache.phoenix.replication.log.LogFileReaderContext;
 import org.apache.phoenix.replication.log.LogFileTestUtil;
 import org.apache.phoenix.replication.log.LogFileWriter;
 import org.apache.phoenix.replication.log.LogFileWriterContext;
+import org.apache.phoenix.replication.metrics.ReplicationLogMetricValues;
+import org.junit.Assume;
 import org.junit.Test;
 import org.mockito.InOrder;
 import org.mockito.Mockito;
@@ -2075,4 +2081,150 @@ public class ReplicationLogGroupTest extends 
ReplicationLogBaseTest {
     holdConsumer.countDown();
     filler.join(5000);
   }
+
+  /**
+   * Measures sync coalescing effectiveness when many producer threads 
append+sync concurrently.
+   * Adds a small delay inside the inner writer's sync() so the consumer holds 
long enough for
+   * additional SYNC events to queue behind it on the ring buffer; under 
contention the
+   * LogEventHandler should consolidate them into a single inner sync per 
Disruptor batch. Logs the
+   * producer-sync count, inner-sync count, coalescing ratio, and metric 
histograms; no assertions —
+   * purpose is observation.
+   */
+  @Test
+  public void testReplicationSyncPathSimulator() throws Exception {
+    Assume.assumeTrue("Simulator test, opt in with -Dtest.runSimulator=true",
+      Boolean.getBoolean("test.runSimulator"));
+    final String tableName = "TBLSCM";
+    final int producerCount = Integer.getInteger("test.producerCount", 64);
+    final int syncsPerProducer = Integer.getInteger("test.syncsPerProducer", 
20);
+    final int appendsPerSync = Integer.getInteger("test.appendsPerSync", 5);
+    final int cellsPerMutation = Integer.getInteger("test.cellsPerMutation", 
1);
+    final long innerSyncDelayMs = Long.getLong("test.innerSyncDelayMs", 2);
+
+    // Use the production-default ring buffer size so producers are not 
artificially blocked on
+    // ringBuffer.next() — the default test fixture uses a 32-slot buffer 
which fills under
+    // contention and inflates the producer-perceived sync latency.
+    conf.setInt(ReplicationLogGroup.REPLICATION_LOG_RINGBUFFER_SIZE_KEY,
+      ReplicationLogGroup.DEFAULT_REPLICATION_LOG_RINGBUFFER_SIZE);
+    // Disable size-based rotation for the duration of the run so coalescing 
measurements are not
+    // contaminated by rotation overhead. The fixture's 10 KB threshold 
otherwise causes 5-20
+    // rotations per run at high producerCount.
+    conf.setLong(ReplicationLogGroup.REPLICATION_LOG_ROTATION_SIZE_BYTES_KEY, 
Long.MAX_VALUE);
+    recreateLogGroup();
+
+    final List<LogFileWriter> allWriters =
+      java.util.Collections.synchronizedList(new ArrayList<>());
+
+    // Apply the sleep stub to a writer so concurrent producers' SYNC events 
queue behind it,
+    // and register it for later invocation counting.
+    java.util.function.Consumer<LogFileWriter> instrumentWriter = w -> {
+      try {
+        doAnswer(invocation -> {
+          sleep(innerSyncDelayMs);
+          return invocation.callRealMethod();
+        }).when(w).sync();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      allWriters.add(w);
+    };
+
+    // Instrument the initial writer.
+    LogFileWriter initialWriter = logGroup.getActiveLog().getWriter();
+    assertNotNull("Inner writer should not be null", initialWriter);
+    instrumentWriter.accept(initialWriter);
+
+    // Instrument every writer the rotation path creates.
+    ReplicationLog activeLog = logGroup.getActiveLog();
+    doAnswer(invocation -> {
+      LogFileWriter w = (LogFileWriter) invocation.callRealMethod();
+      instrumentWriter.accept(w);
+      return w;
+    }).when(activeLog).createNewWriter();
+
+    final CountDownLatch startLatch = new CountDownLatch(1);
+    final CountDownLatch doneLatch = new CountDownLatch(producerCount);
+    final AtomicLong totalProducerSyncs = new AtomicLong(0);
+    final AtomicLong totalProducerAppends = new AtomicLong(0);
+    final AtomicLong commitIdSeq = new AtomicLong(0);
+    ExecutorService pool = Executors.newFixedThreadPool(producerCount);
+
+    try {
+      for (int p = 0; p < producerCount; p++) {
+        pool.submit(() -> {
+          try {
+            startLatch.await();
+            for (int i = 0; i < syncsPerProducer; i++) {
+              for (int j = 0; j < appendsPerSync; j++) {
+                long commitId = commitIdSeq.getAndIncrement();
+                Mutation put = LogFileTestUtil.newPut("row" + commitId, 
commitId, cellsPerMutation);
+                logGroup.append(tableName, commitId, put);
+                totalProducerAppends.incrementAndGet();
+              }
+              logGroup.sync();
+              totalProducerSyncs.incrementAndGet();
+            }
+          } catch (Exception e) {
+            LOG.error("Producer failed", e);
+          } finally {
+            doneLatch.countDown();
+          }
+        });
+      }
+
+      long startNs = System.nanoTime();
+      startLatch.countDown();
+      doneLatch.await();
+      long elapsedMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startNs);
+
+      // Count actual fsyncs invoked across all writers (rotation creates new 
ones mid-test).
+      int innerSyncCount = 0;
+      synchronized (allWriters) {
+        for (LogFileWriter w : allWriters) {
+          innerSyncCount += (int) 
Mockito.mockingDetails(w).getInvocations().stream()
+            .filter(inv -> "sync".equals(inv.getMethod().getName())).count();
+        }
+      }
+      int writerCount = allWriters.size();
+      long producerSyncs = totalProducerSyncs.get();
+      double coalescingRatio = innerSyncCount == 0 ? 0.0 : (double) 
producerSyncs / innerSyncCount;
+
+      LOG.info(
+        "Sync coalescing: producers={} syncsPerProducer={} appendsPerSync={} 
cellsPerMutation={} "
+          + "totalProducerAppends={} totalProducerSyncs={} innerSyncCalls={} 
writerCount={} "
+          + "ratio={} elapsedMs={} innerSyncDelayMs={}",
+        producerCount, syncsPerProducer, appendsPerSync, cellsPerMutation,
+        totalProducerAppends.get(), producerSyncs, innerSyncCount, writerCount,
+        String.format("%.2f", coalescingRatio), elapsedMs, innerSyncDelayMs);
+      ReplicationLogMetricValues metricValues = 
logGroup.getMetrics().getCurrentMetricValues();
+      // syncTime, fsSyncTime histograms record in milliseconds; 
ringBufferTime and
+      // pendingSyncWaitTime record in nanoseconds. Compare the decomposition 
in nanoseconds to
+      // avoid ms-truncation losses. Each ms-stored value loses up to ~1ms of 
precision via floor
+      // truncation. fsSyncTime appears on the component side (its floor 
truncation makes
+      // componentSumNs undershoot), so a ~1ms tolerance is needed for the 
comparison to hold
+      // reliably at low absolute values.
+      long ringBufferTimeNs = metricValues.getRingBufferTimeMax();
+      long pendingSyncWaitTimeNs = metricValues.getPendingSyncWaitTimeMax();
+      long fsSyncTimeNs = 
TimeUnit.MILLISECONDS.toNanos(metricValues.getFsSyncTimeMax());
+      long syncTimeNs = 
TimeUnit.MILLISECONDS.toNanos(metricValues.getSyncTimeMax());
+      long componentSumNs = ringBufferTimeNs + pendingSyncWaitTimeNs + 
fsSyncTimeNs;
+      long truncationToleranceNs = TimeUnit.MILLISECONDS.toNanos(1);
+      LOG.info(
+        "Metrics snapshot: maxBatchSize={} maxPendingSyncCount={}"
+          + " syncTime[p50={}ms p99={}ms max={}ms]" + " ringBuffer[p50={}ns 
p99={}ns max={}ns]"
+          + " fsSync[p50={}ms p99={}ms max={}ms]" + " pendingSyncWait[p50={}ns 
p99={}ns max={}ns]"
+          + " maxComponentSumNs={} syncTimeWithinBound={}",
+        metricValues.getBatchSizeMax(), metricValues.getPendingSyncCountMax(),
+        metricValues.getSyncTimeP50(), metricValues.getSyncTimeP99(), 
metricValues.getSyncTimeMax(),
+        metricValues.getRingBufferTimeP50(), 
metricValues.getRingBufferTimeP99(),
+        metricValues.getRingBufferTimeMax(), metricValues.getFsSyncTimeP50(),
+        metricValues.getFsSyncTimeP99(), metricValues.getFsSyncTimeMax(),
+        metricValues.getPendingSyncWaitTimeP50(), 
metricValues.getPendingSyncWaitTimeP99(),
+        metricValues.getPendingSyncWaitTimeMax(), componentSumNs,
+        syncTimeNs <= componentSumNs + truncationToleranceNs);
+    } finally {
+      pool.shutdownNow();
+      pool.awaitTermination(5, TimeUnit.SECONDS);
+    }
+  }
 }
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImplTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImplTest.java
new file mode 100644
index 0000000000..a53c54280e
--- /dev/null
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImplTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.metrics;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit tests for {@link MetricsReplicationLogGroupSourceImpl}, focused on 
verifying that the
+ * histogram unit conventions documented in {@link 
MetricsReplicationLogGroupSource} are honored:
+ * appendTime and pendingSyncWaitTime record nanoseconds verbatim; syncTime, 
fsSyncTime,
+ * ringBufferTime, and rotationTime convert input nanoseconds to milliseconds 
before recording.
+ */
+public class MetricsReplicationLogGroupSourceImplTest {
+
+  private MetricsReplicationLogGroupSourceImpl source;
+
+  @Before
+  public void setUp() {
+    source = new MetricsReplicationLogGroupSourceImpl("testHaGroup");
+  }
+
+  @After
+  public void tearDown() {
+    source.close();
+  }
+
+  @Test
+  public void testNanosecondHistogramsRecordInputVerbatim() {
+    source.updateAppendTime(1500L);
+    assertEquals(1500L, source.getCurrentMetricValues().getAppendTimeMax());
+
+    source.updatePendingSyncWaitTime(2_500_000L);
+    assertEquals(2_500_000L, 
source.getCurrentMetricValues().getPendingSyncWaitTimeMax());
+
+    source.updateRingBufferTime(7_500_000L);
+    assertEquals(7_500_000L, 
source.getCurrentMetricValues().getRingBufferTimeMax());
+  }
+
+  @Test
+  public void testMillisecondHistogramsConvertNsToMs() {
+    source.updateSyncTime(5_000_000L);
+    assertEquals(5L, source.getCurrentMetricValues().getSyncTimeMax());
+
+    source.updateFsSyncTime(1_000_000L);
+    assertEquals(1L, source.getCurrentMetricValues().getFsSyncTimeMax());
+
+    source.updateRotationTime(35_000_000L);
+    assertEquals(35L, source.getCurrentMetricValues().getRotationTimeMax());
+  }
+
+  @Test
+  public void testSubMillisecondInputTruncatesToZero() {
+    source.updateSyncTime(500_000L);
+    assertEquals(0L, source.getCurrentMetricValues().getSyncTimeMax());
+  }
+
+  @Test
+  public void testHistogramReportsMaxAcrossSamples() {
+    source.updateSyncTime(2_000_000L);
+    source.updateSyncTime(10_000_000L);
+    source.updateSyncTime(5_000_000L);
+    assertEquals(10L, source.getCurrentMetricValues().getSyncTimeMax());
+  }
+
+  @Test
+  public void testCounters() {
+    source.incrementRotationCount();
+    source.incrementRotationCount();
+    source.incrementRotationFailureCount();
+    source.incrementSyncToSafTransitions();
+
+    ReplicationLogMetricValues v = source.getCurrentMetricValues();
+    assertEquals(2L, v.getRotationCount());
+    assertEquals(1L, v.getRotationFailuresCount());
+    assertEquals(1L, v.getSyncToSafTransitions());
+  }
+
+  @Test
+  public void testBatchSizeAndPendingSyncCount() {
+    source.updateBatchSize(100L);
+    source.updatePendingSyncCount(50L);
+    ReplicationLogMetricValues v = source.getCurrentMetricValues();
+    assertEquals(100L, v.getBatchSizeMax());
+    assertEquals(50L, v.getPendingSyncCountMax());
+  }
+}


Reply via email to