This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by 
this push:
     new 97d34ae608 PHOENIX-7877 More granular metrics for ReplicationLogWriter 
(#2492)
97d34ae608 is described below

commit 97d34ae608206b8a1e889151854acc8578bf2280
Author: tkhurana <[email protected]>
AuthorDate: Wed Jun 3 17:03:24 2026 -0700

    PHOENIX-7877 More granular metrics for ReplicationLogWriter (#2492)
    
    Adds histograms to decompose ReplicationLogGroup sync latency: 
pendingSyncWaitTime
    measures wait between consumer pickup of a SYNC event and start of fsync;
    fsSyncTime (renamed from modeSyncTime) measures the underlying filesystem 
sync.
    Also instruments rotationTime in ReplicationLog so existing metric is 
actually
    emitted. Introduces a builder for ReplicationLogMetricValues to avoid 
positional
    argument hazards as the metric set grows.
---
 .../apache/phoenix/replication/ReplicationLog.java |   3 +
 .../phoenix/replication/ReplicationLogGroup.java   |  58 +++++++---
 .../metrics/MetricsReplicationLogGroupSource.java  |  48 ++++++++-
 .../MetricsReplicationLogGroupSourceImpl.java      |  40 ++++++-
 .../metrics/ReplicationLogMetricValues.java        | 118 +++++++++++++++++++--
 .../phoenix/replication/ReplicationLogGroupIT.java |  22 ++++
 6 files changed, 260 insertions(+), 29 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
index 3949b3ef2b..410bd17ddb 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
@@ -437,6 +437,7 @@ public class ReplicationLog {
         return;
       }
       boolean staged = false;
+      long startNs = System.nanoTime();
       try {
         LogFileWriter newWriter = createNewWriter();
         LogFileWriter undrained = pendingWriter.getAndSet(newWriter);
@@ -458,6 +459,8 @@ public class ReplicationLog {
             numFailures, maxRotationRetries, t);
         }
       } finally {
+        // Time both success and failure paths so slow rotations are visible 
even when they fail.
+        logGroup.getMetrics().updateRotationTime(System.nanoTime() - startNs);
         if (onDemand) {
           // Clear the flag last so requestRotation()'s CAS rejects duplicate 
on-demand
           // submissions while this task is still creating/staging a writer.
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
index c1e531461a..0d7a9f42e4 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
@@ -962,12 +962,27 @@ public class ReplicationLogGroup {
   }
 
   /**
-   * Handles events from the Disruptor,
+   * Holder for a pending SYNC event captured at consumer pickup time. Stored 
separately from the
+   * LogEvent because Disruptor reuses ring buffer slots — retaining a 
LogEvent reference past the
+   * end of its onEvent invocation is unsafe.
    */
+  private static class PendingSync {
+    final CompletableFuture<Void> future;
+    final long pickupTimeNs;
+
+    PendingSync(CompletableFuture<Void> future, long pickupTimeNs) {
+      this.future = future;
+      this.pickupTimeNs = pickupTimeNs;
+    }
+  }
+
+  /** Handles events from the Disruptor. */
   protected class LogEventHandler implements EventHandler<LogEvent>, 
LifecycleAware {
-    private final List<CompletableFuture<Void>> pendingSyncFutures = new 
ArrayList<>();
+    private final List<PendingSync> pendingSyncs = new ArrayList<>();
     private ReplicationModeImpl currentModeImpl;
     private volatile IOException fatalException;
+    // Counts events drained per Disruptor batch. Single-threaded access from 
onEvent.
+    private int batchEventCount;
 
     public LogEventHandler() {
     }
@@ -1028,16 +1043,27 @@ public class ReplicationLogGroup {
      * @throws IOException if the sync operation fails
      */
     private void processPendingSyncs(long sequence) throws IOException {
-      if (pendingSyncFutures.isEmpty()) {
+      int pendingSyncCount = pendingSyncs.size();
+      if (pendingSyncCount == 0) {
         return;
       }
+      metrics.updatePendingSyncCount(pendingSyncCount);
+      // Record per-event wait between SYNC pickup and fsync start.
+      long syncStartNs = System.nanoTime();
+      for (PendingSync ps : pendingSyncs) {
+        metrics.updatePendingSyncWaitTime(syncStartNs - ps.pickupTimeNs);
+      }
       // call sync on the current mode
-      currentModeImpl.sync();
+      try {
+        currentModeImpl.sync();
+      } finally {
+        metrics.updateFsSyncTime(System.nanoTime() - syncStartNs);
+      }
       // Complete all pending sync futures
-      for (CompletableFuture<Void> future : pendingSyncFutures) {
-        future.complete(null);
+      for (PendingSync ps : pendingSyncs) {
+        ps.future.complete(null);
       }
-      pendingSyncFutures.clear();
+      pendingSyncs.clear();
       LOG.debug("Sync operation completed successfully up to sequence {}", 
sequence);
       // after a successful sync check the mode set on the replication group
       // Doing the mode check on sync points makes the implementation more 
robust
@@ -1068,13 +1094,13 @@ public class ReplicationLogGroup {
      * @param e        The IOException that caused the failure
      */
     private void failPendingSyncs(long sequence, IOException e) {
-      if (pendingSyncFutures.isEmpty()) {
+      if (pendingSyncs.isEmpty()) {
         return;
       }
-      for (CompletableFuture<Void> future : pendingSyncFutures) {
-        future.completeExceptionally(e);
+      for (PendingSync ps : pendingSyncs) {
+        ps.future.completeExceptionally(e);
       }
-      pendingSyncFutures.clear();
+      pendingSyncs.clear();
       LOG.warn("Failed to process syncs at sequence {}", sequence, e);
     }
 
@@ -1145,6 +1171,7 @@ public class ReplicationLogGroup {
       long currentTimeNs = System.nanoTime();
       long ringBufferTimeNs = currentTimeNs - event.timestampNs;
       metrics.updateRingBufferTime(ringBufferTimeNs);
+      batchEventCount++;
       if (fatalException != null) {
         // Append events are ignored; sync futures are failed immediately
         // so producer threads unblock without waiting for the sync timeout.
@@ -1159,7 +1186,7 @@ public class ReplicationLogGroup {
             currentModeImpl.append(event.record);
             break;
           case EVENT_TYPE_SYNC:
-            pendingSyncFutures.add(event.syncFuture);
+            pendingSyncs.add(new PendingSync(event.syncFuture, currentTimeNs));
             break;
           case EVENT_TYPE_SWAP:
             // Wake-up marker from LogRotationTask. Drain the staged writer so 
the old writer is
@@ -1188,6 +1215,13 @@ public class ReplicationLogGroup {
           new IOException("Unexpected error in event handler at sequence " + 
sequence, t);
         setFatalException(wrapped);
         failPendingSyncs(sequence, wrapped);
+      } finally {
+        // Reset on endOfBatch regardless of success/failure so the counter 
never leaks
+        // across batches when an exception path bypasses processPendingSyncs.
+        if (endOfBatch) {
+          metrics.updateBatchSize(batchEventCount);
+          batchEventCount = 0;
+        }
       }
     }
 
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSource.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSource.java
index b81df30b5f..6c0392f2b7 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSource.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSource.java
@@ -33,17 +33,33 @@ public interface MetricsReplicationLogGroupSource extends 
BaseSource {
   String ROTATION_FAILURES = "rotationFailures";
 
   String ROTATION_FAILURES_DESC = "Number of times log rotation has failed";
-  String APPEND_TIME = "appendTimeMs";
+
+  // All time histograms in this source are nanoseconds.
+  String APPEND_TIME = "appendTime";
   String APPEND_TIME_DESC = "Histogram of time taken for append operations in 
nanoseconds";
 
-  String SYNC_TIME = "syncTimeMs";
+  String SYNC_TIME = "syncTime";
   String SYNC_TIME_DESC = "Histogram of time taken for sync operations in 
nanoseconds";
 
-  String ROTATION_TIME = "rotationTimeMs";
+  String ROTATION_TIME = "rotationTime";
   String ROTATION_TIME_DESC = "Histogram of time taken for log rotations in 
nanoseconds";
 
   String RING_BUFFER_TIME = "ringBufferTime";
-  String RING_BUFFER_TIME_DESC = "Time events spend in the ring buffer";
+  String RING_BUFFER_TIME_DESC = "Time events spend in the ring buffer in 
nanoseconds";
+
+  String FS_SYNC_TIME = "fsSyncTime";
+  String FS_SYNC_TIME_DESC =
+    "Histogram of time taken for the underlying filesystem sync (fsync) in 
nanoseconds";
+
+  String BATCH_SIZE = "batchSize";
+  String BATCH_SIZE_DESC = "Histogram of number of events drained per 
Disruptor batch";
+
+  String PENDING_SYNC_COUNT = "pendingSyncCount";
+  String PENDING_SYNC_COUNT_DESC = "Histogram of pending sync futures 
coalesced into one fsync";
+
+  String PENDING_SYNC_WAIT_TIME = "pendingSyncWaitTime";
+  String PENDING_SYNC_WAIT_TIME_DESC =
+    "Time a SYNC event waits between consumer pickup and fsync start, in 
nanoseconds";
 
   String SYNC_TO_SAF_TRANSITIONS = "syncToSafTransitions";
   String SYNC_TO_SAF_TRANSITIONS_DESC = "Number of SYNC to STORE_AND_FORWARD 
mode transitions";
@@ -78,6 +94,30 @@ public interface MetricsReplicationLogGroupSource extends 
BaseSource {
    */
   void updateRingBufferTime(long timeNs);
 
+  /**
+   * Update the time taken for the underlying filesystem sync (fsync) in 
nanoseconds.
+   * @param timeNs Time taken in nanoseconds
+   */
+  void updateFsSyncTime(long timeNs);
+
+  /**
+   * Update the number of events drained in a single Disruptor batch.
+   * @param size Number of events in the batch
+   */
+  void updateBatchSize(long size);
+
+  /**
+   * Update the number of pending sync futures coalesced into one fsync.
+   * @param count Number of sync futures
+   */
+  void updatePendingSyncCount(long count);
+
+  /**
+   * Update the time a SYNC event waited between consumer pickup and fsync 
start.
+   * @param timeNs Time in nanoseconds
+   */
+  void updatePendingSyncWaitTime(long timeNs);
+
   /**
    * Increments the counter for log rotation failures. This counter tracks the 
number of times log
    * rotation has failed.
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImpl.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImpl.java
index ea552b47cd..7fe679075d 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImpl.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImpl.java
@@ -33,6 +33,10 @@ public class MetricsReplicationLogGroupSourceImpl extends 
BaseSourceImpl
   private final MutableHistogram syncTime;
   private final MutableHistogram rotationTime;
   private final MutableHistogram ringBufferTime;
+  private final MutableHistogram fsSyncTime;
+  private final MutableHistogram batchSize;
+  private final MutableHistogram pendingSyncCount;
+  private final MutableHistogram pendingSyncWaitTime;
 
   public MetricsReplicationLogGroupSourceImpl(String haGroupName) {
     this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, 
METRICS_JMX_CONTEXT, haGroupName);
@@ -51,6 +55,12 @@ public class MetricsReplicationLogGroupSourceImpl extends 
BaseSourceImpl
     syncTime = getMetricsRegistry().newHistogram(SYNC_TIME, SYNC_TIME_DESC);
     rotationTime = getMetricsRegistry().newHistogram(ROTATION_TIME, 
ROTATION_TIME_DESC);
     ringBufferTime = getMetricsRegistry().newHistogram(RING_BUFFER_TIME, 
RING_BUFFER_TIME_DESC);
+    fsSyncTime = getMetricsRegistry().newHistogram(FS_SYNC_TIME, 
FS_SYNC_TIME_DESC);
+    batchSize = getMetricsRegistry().newHistogram(BATCH_SIZE, BATCH_SIZE_DESC);
+    pendingSyncCount =
+      getMetricsRegistry().newHistogram(PENDING_SYNC_COUNT, 
PENDING_SYNC_COUNT_DESC);
+    pendingSyncWaitTime =
+      getMetricsRegistry().newHistogram(PENDING_SYNC_WAIT_TIME, 
PENDING_SYNC_WAIT_TIME_DESC);
   }
 
   @Override
@@ -93,11 +103,35 @@ public class MetricsReplicationLogGroupSourceImpl extends 
BaseSourceImpl
     ringBufferTime.add(timeNs);
   }
 
+  @Override
+  public void updateFsSyncTime(long timeNs) {
+    fsSyncTime.add(timeNs);
+  }
+
+  @Override
+  public void updateBatchSize(long size) {
+    batchSize.add(size);
+  }
+
+  @Override
+  public void updatePendingSyncCount(long count) {
+    pendingSyncCount.add(count);
+  }
+
+  @Override
+  public void updatePendingSyncWaitTime(long timeNs) {
+    pendingSyncWaitTime.add(timeNs);
+  }
+
   @Override
   public ReplicationLogMetricValues getCurrentMetricValues() {
-    return new ReplicationLogMetricValues(rotationCount.value(), 
rotationFailuresCount.value(),
-      syncToSafTransitions.value(), appendTime.getMax(), syncTime.getMax(), 
rotationTime.getMax(),
-      ringBufferTime.getMax());
+    return 
ReplicationLogMetricValues.builder().rotationCount(rotationCount.value())
+      .rotationFailuresCount(rotationFailuresCount.value())
+      
.syncToSafTransitions(syncToSafTransitions.value()).appendTime(appendTime.getMax())
+      .syncTime(syncTime.getMax()).rotationTime(rotationTime.getMax())
+      .ringBufferTime(ringBufferTime.getMax()).fsSyncTime(fsSyncTime.getMax())
+      
.batchSize(batchSize.getMax()).pendingSyncCount(pendingSyncCount.getMax())
+      .pendingSyncWaitTime(pendingSyncWaitTime.getMax()).build();
   }
 
   @Override
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/ReplicationLogMetricValues.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/ReplicationLogMetricValues.java
index ac1aab4e20..3fa79d1d02 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/ReplicationLogMetricValues.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/ReplicationLogMetricValues.java
@@ -27,17 +27,27 @@ public class ReplicationLogMetricValues {
   private final long syncTime;
   private final long rotationTime;
   private final long ringBufferTime;
+  private final long fsSyncTime;
+  private final long batchSize;
+  private final long pendingSyncCount;
+  private final long pendingSyncWaitTime;
 
-  public ReplicationLogMetricValues(long rotationCount, long 
rotationFailuresCount,
-    long syncToSafTransitions, long appendTime, long syncTime, long 
rotationTime,
-    long ringBufferTime) {
-    this.rotationCount = rotationCount;
-    this.rotationFailuresCount = rotationFailuresCount;
-    this.syncToSafTransitions = syncToSafTransitions;
-    this.appendTime = appendTime;
-    this.syncTime = syncTime;
-    this.rotationTime = rotationTime;
-    this.ringBufferTime = ringBufferTime;
+  private ReplicationLogMetricValues(Builder b) {
+    this.rotationCount = b.rotationCount;
+    this.rotationFailuresCount = b.rotationFailuresCount;
+    this.syncToSafTransitions = b.syncToSafTransitions;
+    this.appendTime = b.appendTime;
+    this.syncTime = b.syncTime;
+    this.rotationTime = b.rotationTime;
+    this.ringBufferTime = b.ringBufferTime;
+    this.fsSyncTime = b.fsSyncTime;
+    this.batchSize = b.batchSize;
+    this.pendingSyncCount = b.pendingSyncCount;
+    this.pendingSyncWaitTime = b.pendingSyncWaitTime;
+  }
+
+  public static Builder builder() {
+    return new Builder();
   }
 
   public long getRotationCount() {
@@ -68,4 +78,92 @@ public class ReplicationLogMetricValues {
     return ringBufferTime;
   }
 
+  public long getFsSyncTime() {
+    return fsSyncTime;
+  }
+
+  public long getBatchSize() {
+    return batchSize;
+  }
+
+  public long getPendingSyncCount() {
+    return pendingSyncCount;
+  }
+
+  public long getPendingSyncWaitTime() {
+    return pendingSyncWaitTime;
+  }
+
+  public static class Builder {
+    private long rotationCount;
+    private long rotationFailuresCount;
+    private long syncToSafTransitions;
+    private long appendTime;
+    private long syncTime;
+    private long rotationTime;
+    private long ringBufferTime;
+    private long fsSyncTime;
+    private long batchSize;
+    private long pendingSyncCount;
+    private long pendingSyncWaitTime;
+
+    public Builder rotationCount(long v) {
+      this.rotationCount = v;
+      return this;
+    }
+
+    public Builder rotationFailuresCount(long v) {
+      this.rotationFailuresCount = v;
+      return this;
+    }
+
+    public Builder syncToSafTransitions(long v) {
+      this.syncToSafTransitions = v;
+      return this;
+    }
+
+    public Builder appendTime(long v) {
+      this.appendTime = v;
+      return this;
+    }
+
+    public Builder syncTime(long v) {
+      this.syncTime = v;
+      return this;
+    }
+
+    public Builder rotationTime(long v) {
+      this.rotationTime = v;
+      return this;
+    }
+
+    public Builder ringBufferTime(long v) {
+      this.ringBufferTime = v;
+      return this;
+    }
+
+    public Builder fsSyncTime(long v) {
+      this.fsSyncTime = v;
+      return this;
+    }
+
+    public Builder batchSize(long v) {
+      this.batchSize = v;
+      return this;
+    }
+
+    public Builder pendingSyncCount(long v) {
+      this.pendingSyncCount = v;
+      return this;
+    }
+
+    public Builder pendingSyncWaitTime(long v) {
+      this.pendingSyncWaitTime = v;
+      return this;
+    }
+
+    public ReplicationLogMetricValues build() {
+      return new ReplicationLogMetricValues(this);
+    }
+  }
 }
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
index c0172274c7..d70dabeb47 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
@@ -67,6 +67,7 @@ import org.apache.phoenix.jdbc.HighAvailabilityTestingUtility;
 import org.apache.phoenix.jdbc.PhoenixDriver;
 import org.apache.phoenix.query.PhoenixTestBuilder;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.replication.metrics.ReplicationLogMetricValues;
 import org.apache.phoenix.replication.reader.ReplicationLogProcessor;
 import org.apache.phoenix.replication.tool.LogFileAnalyzer;
 import org.apache.phoenix.util.TestUtil;
@@ -178,6 +179,22 @@ public class ReplicationLogGroupIT extends HABaseIT {
     }
   }
 
+  private void assertMetricsEmitted() {
+    ReplicationLogMetricValues values = 
logGroup.getMetrics().getCurrentMetricValues();
+    assertTrue("appendTime should be > 0, got " + values.getAppendTime(),
+      values.getAppendTime() > 0);
+    assertTrue("syncTime should be > 0, got " + values.getSyncTime(), 
values.getSyncTime() > 0);
+    assertTrue("ringBufferTime should be > 0, got " + 
values.getRingBufferTime(),
+      values.getRingBufferTime() > 0);
+    assertTrue("fsSyncTime should be > 0, got " + values.getFsSyncTime(),
+      values.getFsSyncTime() > 0);
+    assertTrue("batchSize should be > 0, got " + values.getBatchSize(), 
values.getBatchSize() > 0);
+    assertTrue("pendingSyncCount should be > 0, got " + 
values.getPendingSyncCount(),
+      values.getPendingSyncCount() > 0);
+    assertTrue("pendingSyncWaitTime should be > 0, got " + 
values.getPendingSyncWaitTime(),
+      values.getPendingSyncWaitTime() > 0);
+  }
+
   private void dumpTableLogCount(Map<String, List<Mutation>> mutationsByTable) 
{
     LOG.info("Dump table log count for test {}", name.getMethodName());
     for (Map.Entry<String, List<Mutation>> table : 
mutationsByTable.entrySet()) {
@@ -304,6 +321,11 @@ public class ReplicationLogGroupIT extends HABaseIT {
         }
       }
 
+      // Sanity-check that producer- and consumer-side metrics fired at least 
once on the haGroup.
+      // This guards against the rotationTimeMs-style bug where a metric is 
declared but never
+      // emitted. Snapshot before verifyReplication() since it closes the log 
group.
+      assertMetricsEmitted();
+
       // verify replication mutation counts
       // mutation count will be equal to row count since the atomic upsert 
mutations will be
       // ignored and therefore not replicated

Reply via email to