This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by
this push:
new 97d34ae608 PHOENIX-7877 More granular metrics for ReplicationLogWriter
(#2492)
97d34ae608 is described below
commit 97d34ae608206b8a1e889151854acc8578bf2280
Author: tkhurana <[email protected]>
AuthorDate: Wed Jun 3 17:03:24 2026 -0700
PHOENIX-7877 More granular metrics for ReplicationLogWriter (#2492)
Adds histograms to decompose ReplicationLogGroup sync latency:
pendingSyncWaitTime
measures wait between consumer pickup of a SYNC event and start of fsync;
fsSyncTime (renamed from modeSyncTime) measures the underlying filesystem
sync.
Also instruments rotationTime in ReplicationLog so existing metric is
actually
emitted. Introduces a builder for ReplicationLogMetricValues to avoid
positional
argument hazards as the metric set grows.
---
.../apache/phoenix/replication/ReplicationLog.java | 3 +
.../phoenix/replication/ReplicationLogGroup.java | 58 +++++++---
.../metrics/MetricsReplicationLogGroupSource.java | 48 ++++++++-
.../MetricsReplicationLogGroupSourceImpl.java | 40 ++++++-
.../metrics/ReplicationLogMetricValues.java | 118 +++++++++++++++++++--
.../phoenix/replication/ReplicationLogGroupIT.java | 22 ++++
6 files changed, 260 insertions(+), 29 deletions(-)
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
index 3949b3ef2b..410bd17ddb 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLog.java
@@ -437,6 +437,7 @@ public class ReplicationLog {
return;
}
boolean staged = false;
+ long startNs = System.nanoTime();
try {
LogFileWriter newWriter = createNewWriter();
LogFileWriter undrained = pendingWriter.getAndSet(newWriter);
@@ -458,6 +459,8 @@ public class ReplicationLog {
numFailures, maxRotationRetries, t);
}
} finally {
+ // Time both success and failure paths so slow rotations are visible
even when they fail.
+ logGroup.getMetrics().updateRotationTime(System.nanoTime() - startNs);
if (onDemand) {
// Clear the flag last so requestRotation()'s CAS rejects duplicate
on-demand
// submissions while this task is still creating/staging a writer.
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
index c1e531461a..0d7a9f42e4 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogGroup.java
@@ -962,12 +962,27 @@ public class ReplicationLogGroup {
}
/**
- * Handles events from the Disruptor,
+ * Holder for a pending SYNC event captured at consumer pickup time. Stored
separately from the
+ * LogEvent because Disruptor reuses ring buffer slots — retaining a
LogEvent reference past the
+ * end of its onEvent invocation is unsafe.
*/
+ private static class PendingSync {
+ final CompletableFuture<Void> future;
+ final long pickupTimeNs;
+
+ PendingSync(CompletableFuture<Void> future, long pickupTimeNs) {
+ this.future = future;
+ this.pickupTimeNs = pickupTimeNs;
+ }
+ }
+
+ /** Handles events from the Disruptor. */
protected class LogEventHandler implements EventHandler<LogEvent>,
LifecycleAware {
- private final List<CompletableFuture<Void>> pendingSyncFutures = new
ArrayList<>();
+ private final List<PendingSync> pendingSyncs = new ArrayList<>();
private ReplicationModeImpl currentModeImpl;
private volatile IOException fatalException;
+ // Counts events drained per Disruptor batch. Single-threaded access from
onEvent.
+ private int batchEventCount;
public LogEventHandler() {
}
@@ -1028,16 +1043,27 @@ public class ReplicationLogGroup {
* @throws IOException if the sync operation fails
*/
private void processPendingSyncs(long sequence) throws IOException {
- if (pendingSyncFutures.isEmpty()) {
+ int pendingSyncCount = pendingSyncs.size();
+ if (pendingSyncCount == 0) {
return;
}
+ metrics.updatePendingSyncCount(pendingSyncCount);
+ // Record per-event wait between SYNC pickup and fsync start.
+ long syncStartNs = System.nanoTime();
+ for (PendingSync ps : pendingSyncs) {
+ metrics.updatePendingSyncWaitTime(syncStartNs - ps.pickupTimeNs);
+ }
// call sync on the current mode
- currentModeImpl.sync();
+ try {
+ currentModeImpl.sync();
+ } finally {
+ metrics.updateFsSyncTime(System.nanoTime() - syncStartNs);
+ }
// Complete all pending sync futures
- for (CompletableFuture<Void> future : pendingSyncFutures) {
- future.complete(null);
+ for (PendingSync ps : pendingSyncs) {
+ ps.future.complete(null);
}
- pendingSyncFutures.clear();
+ pendingSyncs.clear();
LOG.debug("Sync operation completed successfully up to sequence {}",
sequence);
// after a successful sync check the mode set on the replication group
// Doing the mode check on sync points makes the implementation more
robust
@@ -1068,13 +1094,13 @@ public class ReplicationLogGroup {
* @param e The IOException that caused the failure
*/
private void failPendingSyncs(long sequence, IOException e) {
- if (pendingSyncFutures.isEmpty()) {
+ if (pendingSyncs.isEmpty()) {
return;
}
- for (CompletableFuture<Void> future : pendingSyncFutures) {
- future.completeExceptionally(e);
+ for (PendingSync ps : pendingSyncs) {
+ ps.future.completeExceptionally(e);
}
- pendingSyncFutures.clear();
+ pendingSyncs.clear();
LOG.warn("Failed to process syncs at sequence {}", sequence, e);
}
@@ -1145,6 +1171,7 @@ public class ReplicationLogGroup {
long currentTimeNs = System.nanoTime();
long ringBufferTimeNs = currentTimeNs - event.timestampNs;
metrics.updateRingBufferTime(ringBufferTimeNs);
+ batchEventCount++;
if (fatalException != null) {
// Append events are ignored; sync futures are failed immediately
// so producer threads unblock without waiting for the sync timeout.
@@ -1159,7 +1186,7 @@ public class ReplicationLogGroup {
currentModeImpl.append(event.record);
break;
case EVENT_TYPE_SYNC:
- pendingSyncFutures.add(event.syncFuture);
+ pendingSyncs.add(new PendingSync(event.syncFuture, currentTimeNs));
break;
case EVENT_TYPE_SWAP:
// Wake-up marker from LogRotationTask. Drain the staged writer so
the old writer is
@@ -1188,6 +1215,13 @@ public class ReplicationLogGroup {
new IOException("Unexpected error in event handler at sequence " +
sequence, t);
setFatalException(wrapped);
failPendingSyncs(sequence, wrapped);
+ } finally {
+ // Reset on endOfBatch regardless of success/failure so the counter
never leaks
+ // across batches when an exception path bypasses processPendingSyncs.
+ if (endOfBatch) {
+ metrics.updateBatchSize(batchEventCount);
+ batchEventCount = 0;
+ }
}
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSource.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSource.java
index b81df30b5f..6c0392f2b7 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSource.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSource.java
@@ -33,17 +33,33 @@ public interface MetricsReplicationLogGroupSource extends
BaseSource {
String ROTATION_FAILURES = "rotationFailures";
String ROTATION_FAILURES_DESC = "Number of times log rotation has failed";
- String APPEND_TIME = "appendTimeMs";
+
+ // All time histograms in this source are nanoseconds.
+ String APPEND_TIME = "appendTime";
String APPEND_TIME_DESC = "Histogram of time taken for append operations in
nanoseconds";
- String SYNC_TIME = "syncTimeMs";
+ String SYNC_TIME = "syncTime";
String SYNC_TIME_DESC = "Histogram of time taken for sync operations in
nanoseconds";
- String ROTATION_TIME = "rotationTimeMs";
+ String ROTATION_TIME = "rotationTime";
String ROTATION_TIME_DESC = "Histogram of time taken for log rotations in
nanoseconds";
String RING_BUFFER_TIME = "ringBufferTime";
- String RING_BUFFER_TIME_DESC = "Time events spend in the ring buffer";
+ String RING_BUFFER_TIME_DESC = "Time events spend in the ring buffer in
nanoseconds";
+
+ String FS_SYNC_TIME = "fsSyncTime";
+ String FS_SYNC_TIME_DESC =
+ "Histogram of time taken for the underlying filesystem sync (fsync) in
nanoseconds";
+
+ String BATCH_SIZE = "batchSize";
+ String BATCH_SIZE_DESC = "Histogram of number of events drained per
Disruptor batch";
+
+ String PENDING_SYNC_COUNT = "pendingSyncCount";
+ String PENDING_SYNC_COUNT_DESC = "Histogram of pending sync futures
coalesced into one fsync";
+
+ String PENDING_SYNC_WAIT_TIME = "pendingSyncWaitTime";
+ String PENDING_SYNC_WAIT_TIME_DESC =
+ "Time a SYNC event waits between consumer pickup and fsync start, in
nanoseconds";
String SYNC_TO_SAF_TRANSITIONS = "syncToSafTransitions";
String SYNC_TO_SAF_TRANSITIONS_DESC = "Number of SYNC to STORE_AND_FORWARD
mode transitions";
@@ -78,6 +94,30 @@ public interface MetricsReplicationLogGroupSource extends
BaseSource {
*/
void updateRingBufferTime(long timeNs);
+ /**
+ * Update the time taken for the underlying filesystem sync (fsync) in
nanoseconds.
+ * @param timeNs Time taken in nanoseconds
+ */
+ void updateFsSyncTime(long timeNs);
+
+ /**
+ * Update the number of events drained in a single Disruptor batch.
+ * @param size Number of events in the batch
+ */
+ void updateBatchSize(long size);
+
+ /**
+ * Update the number of pending sync futures coalesced into one fsync.
+ * @param count Number of sync futures
+ */
+ void updatePendingSyncCount(long count);
+
+ /**
+ * Update the time a SYNC event waited between consumer pickup and fsync
start.
+ * @param timeNs Time in nanoseconds
+ */
+ void updatePendingSyncWaitTime(long timeNs);
+
/**
* Increments the counter for log rotation failures. This counter tracks the
number of times log
* rotation has failed.
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImpl.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImpl.java
index ea552b47cd..7fe679075d 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImpl.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/MetricsReplicationLogGroupSourceImpl.java
@@ -33,6 +33,10 @@ public class MetricsReplicationLogGroupSourceImpl extends
BaseSourceImpl
private final MutableHistogram syncTime;
private final MutableHistogram rotationTime;
private final MutableHistogram ringBufferTime;
+ private final MutableHistogram fsSyncTime;
+ private final MutableHistogram batchSize;
+ private final MutableHistogram pendingSyncCount;
+ private final MutableHistogram pendingSyncWaitTime;
public MetricsReplicationLogGroupSourceImpl(String haGroupName) {
this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT,
METRICS_JMX_CONTEXT, haGroupName);
@@ -51,6 +55,12 @@ public class MetricsReplicationLogGroupSourceImpl extends
BaseSourceImpl
syncTime = getMetricsRegistry().newHistogram(SYNC_TIME, SYNC_TIME_DESC);
rotationTime = getMetricsRegistry().newHistogram(ROTATION_TIME,
ROTATION_TIME_DESC);
ringBufferTime = getMetricsRegistry().newHistogram(RING_BUFFER_TIME,
RING_BUFFER_TIME_DESC);
+ fsSyncTime = getMetricsRegistry().newHistogram(FS_SYNC_TIME,
FS_SYNC_TIME_DESC);
+ batchSize = getMetricsRegistry().newHistogram(BATCH_SIZE, BATCH_SIZE_DESC);
+ pendingSyncCount =
+ getMetricsRegistry().newHistogram(PENDING_SYNC_COUNT,
PENDING_SYNC_COUNT_DESC);
+ pendingSyncWaitTime =
+ getMetricsRegistry().newHistogram(PENDING_SYNC_WAIT_TIME,
PENDING_SYNC_WAIT_TIME_DESC);
}
@Override
@@ -93,11 +103,35 @@ public class MetricsReplicationLogGroupSourceImpl extends
BaseSourceImpl
ringBufferTime.add(timeNs);
}
+ @Override
+ public void updateFsSyncTime(long timeNs) {
+ fsSyncTime.add(timeNs);
+ }
+
+ @Override
+ public void updateBatchSize(long size) {
+ batchSize.add(size);
+ }
+
+ @Override
+ public void updatePendingSyncCount(long count) {
+ pendingSyncCount.add(count);
+ }
+
+ @Override
+ public void updatePendingSyncWaitTime(long timeNs) {
+ pendingSyncWaitTime.add(timeNs);
+ }
+
@Override
public ReplicationLogMetricValues getCurrentMetricValues() {
- return new ReplicationLogMetricValues(rotationCount.value(),
rotationFailuresCount.value(),
- syncToSafTransitions.value(), appendTime.getMax(), syncTime.getMax(),
rotationTime.getMax(),
- ringBufferTime.getMax());
+ return
ReplicationLogMetricValues.builder().rotationCount(rotationCount.value())
+ .rotationFailuresCount(rotationFailuresCount.value())
+
.syncToSafTransitions(syncToSafTransitions.value()).appendTime(appendTime.getMax())
+ .syncTime(syncTime.getMax()).rotationTime(rotationTime.getMax())
+ .ringBufferTime(ringBufferTime.getMax()).fsSyncTime(fsSyncTime.getMax())
+
.batchSize(batchSize.getMax()).pendingSyncCount(pendingSyncCount.getMax())
+ .pendingSyncWaitTime(pendingSyncWaitTime.getMax()).build();
}
@Override
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/ReplicationLogMetricValues.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/ReplicationLogMetricValues.java
index ac1aab4e20..3fa79d1d02 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/ReplicationLogMetricValues.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/metrics/ReplicationLogMetricValues.java
@@ -27,17 +27,27 @@ public class ReplicationLogMetricValues {
private final long syncTime;
private final long rotationTime;
private final long ringBufferTime;
+ private final long fsSyncTime;
+ private final long batchSize;
+ private final long pendingSyncCount;
+ private final long pendingSyncWaitTime;
- public ReplicationLogMetricValues(long rotationCount, long
rotationFailuresCount,
- long syncToSafTransitions, long appendTime, long syncTime, long
rotationTime,
- long ringBufferTime) {
- this.rotationCount = rotationCount;
- this.rotationFailuresCount = rotationFailuresCount;
- this.syncToSafTransitions = syncToSafTransitions;
- this.appendTime = appendTime;
- this.syncTime = syncTime;
- this.rotationTime = rotationTime;
- this.ringBufferTime = ringBufferTime;
+ private ReplicationLogMetricValues(Builder b) {
+ this.rotationCount = b.rotationCount;
+ this.rotationFailuresCount = b.rotationFailuresCount;
+ this.syncToSafTransitions = b.syncToSafTransitions;
+ this.appendTime = b.appendTime;
+ this.syncTime = b.syncTime;
+ this.rotationTime = b.rotationTime;
+ this.ringBufferTime = b.ringBufferTime;
+ this.fsSyncTime = b.fsSyncTime;
+ this.batchSize = b.batchSize;
+ this.pendingSyncCount = b.pendingSyncCount;
+ this.pendingSyncWaitTime = b.pendingSyncWaitTime;
+ }
+
+ public static Builder builder() {
+ return new Builder();
}
public long getRotationCount() {
@@ -68,4 +78,92 @@ public class ReplicationLogMetricValues {
return ringBufferTime;
}
+ public long getFsSyncTime() {
+ return fsSyncTime;
+ }
+
+ public long getBatchSize() {
+ return batchSize;
+ }
+
+ public long getPendingSyncCount() {
+ return pendingSyncCount;
+ }
+
+ public long getPendingSyncWaitTime() {
+ return pendingSyncWaitTime;
+ }
+
+ public static class Builder {
+ private long rotationCount;
+ private long rotationFailuresCount;
+ private long syncToSafTransitions;
+ private long appendTime;
+ private long syncTime;
+ private long rotationTime;
+ private long ringBufferTime;
+ private long fsSyncTime;
+ private long batchSize;
+ private long pendingSyncCount;
+ private long pendingSyncWaitTime;
+
+ public Builder rotationCount(long v) {
+ this.rotationCount = v;
+ return this;
+ }
+
+ public Builder rotationFailuresCount(long v) {
+ this.rotationFailuresCount = v;
+ return this;
+ }
+
+ public Builder syncToSafTransitions(long v) {
+ this.syncToSafTransitions = v;
+ return this;
+ }
+
+ public Builder appendTime(long v) {
+ this.appendTime = v;
+ return this;
+ }
+
+ public Builder syncTime(long v) {
+ this.syncTime = v;
+ return this;
+ }
+
+ public Builder rotationTime(long v) {
+ this.rotationTime = v;
+ return this;
+ }
+
+ public Builder ringBufferTime(long v) {
+ this.ringBufferTime = v;
+ return this;
+ }
+
+ public Builder fsSyncTime(long v) {
+ this.fsSyncTime = v;
+ return this;
+ }
+
+ public Builder batchSize(long v) {
+ this.batchSize = v;
+ return this;
+ }
+
+ public Builder pendingSyncCount(long v) {
+ this.pendingSyncCount = v;
+ return this;
+ }
+
+ public Builder pendingSyncWaitTime(long v) {
+ this.pendingSyncWaitTime = v;
+ return this;
+ }
+
+ public ReplicationLogMetricValues build() {
+ return new ReplicationLogMetricValues(this);
+ }
+ }
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
index c0172274c7..d70dabeb47 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/replication/ReplicationLogGroupIT.java
@@ -67,6 +67,7 @@ import org.apache.phoenix.jdbc.HighAvailabilityTestingUtility;
import org.apache.phoenix.jdbc.PhoenixDriver;
import org.apache.phoenix.query.PhoenixTestBuilder;
import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.replication.metrics.ReplicationLogMetricValues;
import org.apache.phoenix.replication.reader.ReplicationLogProcessor;
import org.apache.phoenix.replication.tool.LogFileAnalyzer;
import org.apache.phoenix.util.TestUtil;
@@ -178,6 +179,22 @@ public class ReplicationLogGroupIT extends HABaseIT {
}
}
+ private void assertMetricsEmitted() {
+ ReplicationLogMetricValues values =
logGroup.getMetrics().getCurrentMetricValues();
+ assertTrue("appendTime should be > 0, got " + values.getAppendTime(),
+ values.getAppendTime() > 0);
+ assertTrue("syncTime should be > 0, got " + values.getSyncTime(),
values.getSyncTime() > 0);
+ assertTrue("ringBufferTime should be > 0, got " +
values.getRingBufferTime(),
+ values.getRingBufferTime() > 0);
+ assertTrue("fsSyncTime should be > 0, got " + values.getFsSyncTime(),
+ values.getFsSyncTime() > 0);
+ assertTrue("batchSize should be > 0, got " + values.getBatchSize(),
values.getBatchSize() > 0);
+ assertTrue("pendingSyncCount should be > 0, got " +
values.getPendingSyncCount(),
+ values.getPendingSyncCount() > 0);
+ assertTrue("pendingSyncWaitTime should be > 0, got " +
values.getPendingSyncWaitTime(),
+ values.getPendingSyncWaitTime() > 0);
+ }
+
private void dumpTableLogCount(Map<String, List<Mutation>> mutationsByTable)
{
LOG.info("Dump table log count for test {}", name.getMethodName());
for (Map.Entry<String, List<Mutation>> table :
mutationsByTable.entrySet()) {
@@ -304,6 +321,11 @@ public class ReplicationLogGroupIT extends HABaseIT {
}
}
+ // Sanity-check that producer- and consumer-side metrics fired at least
once on the haGroup.
+ // This guards against the rotationTimeMs-style bug where a metric is
declared but never
+ // emitted. Snapshot before verifyReplication() since it closes the log
group.
+ assertMetricsEmitted();
+
// verify replication mutation counts
// mutation count will be equal to row count since the atomic upsert
mutations will be
// ignored and therefore not replicated