This is an automated email from the ASF dual-hosted git repository.

konstantinov pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e6a97025bd Optimize Counter, Meter and Histogram metrics using thread 
local counters
e6a97025bd is described below

commit e6a97025bda9c52948c33095c10f5d1db89cb651
Author: Dmitry Konstantinov <[email protected]>
AuthorDate: Tue Aug 19 23:46:19 2025 +0100

    Optimize Counter, Meter and Histogram metrics using thread local counters
    
    Codahale metrics do not provide the ability to create custom metric 
implementations, so we have to inherit from Codahale classes.
    For better cache locality rate and counter values are extracted to a common 
thread-local arrays.
    Threads death is tracked using 2 approaches: FastThreadLocal.onRemoval 
callback and phantom references to Thread objects.
    Phantom references are used to track aliveness of metric users and reusing 
of metric IDs.
    
    Patch by Dmitry Konstantinov; reviewed by Benedict Elliott Smith for 
CASSANDRA-20250
---
 CHANGES.txt                                        |   1 +
 .../cassandra/metrics/AtomicLongCounter.java       |  69 ++++
 .../metrics/CassandraMetricsRegistry.java          |  49 ++-
 .../cassandra/metrics/ClearableHistogram.java      |  33 +-
 .../apache/cassandra/metrics/ClientMetrics.java    |   2 +-
 .../{CassandraHistogram.java => Counter.java}      |  27 +-
 .../{CassandraHistogram.java => Meter.java}        |  25 +-
 .../{CassandraHistogram.java => MetricClock.java}  |  23 +-
 .../cassandra/metrics/OverrideHistogram.java       |  24 +-
 .../cassandra/metrics/SnapshottingTimer.java       |   8 +-
 .../apache/cassandra/metrics/StorageMetrics.java   |   3 +-
 .../cassandra/metrics/ThreadLocalCounter.java      |  76 ++++
 .../cassandra/metrics/ThreadLocalHistogram.java    |  89 +++++
 .../apache/cassandra/metrics/ThreadLocalMeter.java | 378 ++++++++++++++++++++
 .../cassandra/metrics/ThreadLocalMetrics.java      | 394 +++++++++++++++++++++
 .../apache/cassandra/metrics/ThreadLocalTimer.java | 224 ++++++++++++
 src/java/org/apache/cassandra/metrics/Timer.java   |  88 +++++
 .../apache/cassandra/utils/ReflectionUtils.java    |  18 +
 .../cassandra/distributed/impl/Instance.java       |   4 +-
 .../cassandra/test/microbench/MetersBench.java     | 147 ++++++++
 .../test/microbench/ThreadLocalMetricsBench.java   | 134 +++++++
 .../metrics/JmxVirtualTableMetricsTest.java        |   2 +-
 .../cassandra/metrics/ThreadLocalCounterTest.java  | 112 ++++++
 .../metrics/ThreadLocalHistogramTest.java          |  34 +-
 .../cassandra/metrics/ThreadLocalMeterTest.java    | 231 ++++++++++++
 .../apache/cassandra/tools/OfflineToolUtils.java   |   1 +
 26 files changed, 2083 insertions(+), 113 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 512a574614..f8502538fb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Optimize Counter, Meter and Histogram metrics using thread local counters 
(CASSANDRA-20250)
  * Update snakeyaml to 2.4 (CASSANDRA-20928)
  * Update Netty to 4.1.125.Final (CASSANDRA-20925)
  * Expose uncaught exceptions in system_views.uncaught_exceptions table 
(CASSANDRA-20858)
diff --git a/src/java/org/apache/cassandra/metrics/AtomicLongCounter.java 
b/src/java/org/apache/cassandra/metrics/AtomicLongCounter.java
new file mode 100644
index 0000000000..0590663894
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/AtomicLongCounter.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.metrics;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.cassandra.utils.ReflectionUtils;
+
+/**
+ * This type of Counter is more efficient (from CPU and memory usage point of 
view)
+ *   than LondAdder-based {@link com.codahale.metrics.Counter} for non-highly 
contented write scenarios.
+ * It is also fast to read and should be used instead of {@link 
ThreadLocalCounter} when getCount performance is critical.
+ */
+public class AtomicLongCounter extends com.codahale.metrics.Counter implements 
Counter
+{
+    private final AtomicLong counter = new AtomicLong();
+
+    public AtomicLongCounter()
+    {
+        // to reduce metrics memory footprint
+        ReflectionUtils.setFieldToNull(this, 
com.codahale.metrics.Counter.class, "count");
+    }
+
+    @Override
+    public void inc()
+    {
+        counter.incrementAndGet();
+    }
+
+    @Override
+    public void inc(long n)
+    {
+        counter.addAndGet(n);
+    }
+
+    @Override
+    public void dec()
+    {
+        counter.decrementAndGet();
+    }
+
+    @Override
+    public void dec(long n)
+    {
+        counter.addAndGet(-n);
+    }
+
+    @Override
+    public long getCount()
+    {
+        return counter.get();
+    }
+}
diff --git 
a/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java 
b/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
index 4eecb75eac..756144c84d 100644
--- a/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
+++ b/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
@@ -172,8 +172,8 @@ public class CassandraMetricsRegistry extends MetricRegistry
             return Long.toString(((Counter) metric).getCount());
         else if (metric instanceof Gauge)
             return getGaugeValue((Gauge) metric);
-        else if (metric instanceof CassandraHistogram)
-            return Double.toString(((CassandraHistogram) 
metric).getSnapshot().getMedian());
+        else if (metric instanceof OverrideHistogram)
+            return Double.toString(((OverrideHistogram) 
metric).getSnapshot().getMedian());
         else if (metric instanceof Meter)
             return Long.toString(((Meter) metric).getCount());
         else if (metric instanceof Timer)
@@ -248,7 +248,7 @@ public class CassandraMetricsRegistry extends MetricRegistry
                                                          "All metrics with 
type \"Histogram\"",
                                                          new 
HistogramMetricRowWalker(),
                                                          Metrics.getMetrics(),
-                                                         
CassandraHistogram.class::isInstance,
+                                                         
OverrideHistogram.class::isInstance,
                                                          
HistogramMetricRow::new))
                .add(createSinglePartitionedValueFiltered(VIRTUAL_METRICS,
                                                          "type_meter",
@@ -302,7 +302,26 @@ public class CassandraMetricsRegistry extends 
MetricRegistry
 
     public Counter counter(MetricName... name)
     {
-        Counter counter = super.counter(name[0].getMetricName());
+        String simpleMetricName = name[0].getMetricName();
+        Metric metric = super.getMetrics().get(simpleMetricName);
+        if (metric instanceof Counter)
+            return (Counter) metric;
+
+        Counter counter = new ThreadLocalCounter();
+        super.register(simpleMetricName, counter);
+        Stream.of(name).forEach(n -> register(n, counter));
+        return counter;
+    }
+
+    public Counter atomicLongCounter(MetricName... name)
+    {
+        String simpleMetricName = name[0].getMetricName();
+        Metric metric = super.getMetrics().get(simpleMetricName);
+        if (metric instanceof Counter)
+            return (Counter) metric;
+
+        Counter counter = new AtomicLongCounter();
+        super.register(simpleMetricName, counter);
         Stream.of(name).forEach(n -> register(n, counter));
         return counter;
     }
@@ -314,19 +333,25 @@ public class CassandraMetricsRegistry extends 
MetricRegistry
 
     public Meter meter(boolean gaugeCompatible, MetricName... name)
     {
-        Meter meter = super.meter(name[0].getMetricName());
+        String simpleMetricName = name[0].getMetricName();
+        Metric metric = super.getMetrics().get(simpleMetricName);
+        if (metric instanceof Meter)
+            return (Meter) metric;
+
+        Meter meter = new ThreadLocalMeter();
+        super.register(simpleMetricName, meter);
         Stream.of(name).forEach(n -> register(gaugeCompatible, n, meter));
         return meter;
     }
 
-    public CassandraHistogram histogram(MetricName name, boolean 
considerZeroes)
+    public OverrideHistogram histogram(MetricName name, boolean considerZeroes)
     {
         return register(name, new ClearableHistogram(new 
DecayingEstimatedHistogramReservoir(considerZeroes)));
     }
 
-    public CassandraHistogram histogram(MetricName name, MetricName alias, 
boolean considerZeroes)
+    public OverrideHistogram histogram(MetricName name, MetricName alias, 
boolean considerZeroes)
     {
-        CassandraHistogram histogram = histogram(name, considerZeroes);
+        OverrideHistogram histogram = histogram(name, considerZeroes);
         register(alias, histogram);
         return histogram;
     }
@@ -531,8 +556,8 @@ public class CassandraMetricsRegistry extends MetricRegistry
             mbean = new JmxGauge((Gauge<?>) metric, name);
         else if (metric instanceof Counter)
             mbean = new JmxCounter((Counter) metric, name);
-        else if (metric instanceof CassandraHistogram)
-            mbean = new JmxHistogram((CassandraHistogram) metric, name);
+        else if (metric instanceof OverrideHistogram)
+            mbean = new JmxHistogram((OverrideHistogram) metric, name);
         else if (metric instanceof Histogram)
             throw new UnsupportedOperationException("Must supply a 
CassandraHistogram");
         else if (metric instanceof Timer)
@@ -662,10 +687,10 @@ public class CassandraMetricsRegistry extends 
MetricRegistry
 
     private static class JmxHistogram extends AbstractBean implements 
JmxHistogramMBean
     {
-        final CassandraHistogram metric;
+        final OverrideHistogram metric;
         private long[] last = null;
 
-        private JmxHistogram(CassandraHistogram metric, ObjectName objectName)
+        private JmxHistogram(OverrideHistogram metric, ObjectName objectName)
         {
             super(objectName);
             this.metric = metric;
diff --git a/src/java/org/apache/cassandra/metrics/ClearableHistogram.java 
b/src/java/org/apache/cassandra/metrics/ClearableHistogram.java
index 66f3f9fd38..8d6749d2fd 100644
--- a/src/java/org/apache/cassandra/metrics/ClearableHistogram.java
+++ b/src/java/org/apache/cassandra/metrics/ClearableHistogram.java
@@ -17,16 +17,13 @@
  */
 package org.apache.cassandra.metrics;
 
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-import java.util.concurrent.atomic.LongAdder;
-
 import com.google.common.annotations.VisibleForTesting;
 
+
 /**
  * Adds ability to reset a histogram
  */
-public class ClearableHistogram extends CassandraHistogram
+public class ClearableHistogram extends ThreadLocalHistogram
 {
     private final DecayingEstimatedHistogramReservoir reservoirRef;
 
@@ -51,30 +48,6 @@ public class ClearableHistogram extends CassandraHistogram
 
     private void clearCount()
     {
-        // We have unfortunately no access to the count field so we need to 
use reflection to ensure that it is cleared.
-        // I hate that as it is fragile and pretty ugly but we only use that 
method for tests and it should fail pretty
-        // clearly if we start using an incompatible version of the metrics.
-        try
-        {
-            Field countField = 
com.codahale.metrics.Histogram.class.getDeclaredField("count");
-            countField.setAccessible(true);
-            // in 3.1 the counter object is a LongAdderAdapter which is a 
package private interface
-            // from com.codahale.metrics. In 4.0, it is a java LongAdder so 
the code will be simpler.
-            Object counter = countField.get(this);
-            if (counter instanceof LongAdder) // For com.codahale.metrics 
version >= 4.0
-            {
-                ((LongAdder) counter).reset();
-            }
-            else // 3.1 and 3.2
-            {
-                Method sumThenReset = 
counter.getClass().getDeclaredMethod("sumThenReset");
-                sumThenReset.setAccessible(true);
-                sumThenReset.invoke(counter);
-            }
-        }
-        catch (Exception e)
-        {
-            throw new IllegalStateException("Cannot reset the 
com.codahale.metrics.Histogram count. This might be due to a change of version 
of the metric library", e);
-        }
+        reset();
     }
 }
diff --git a/src/java/org/apache/cassandra/metrics/ClientMetrics.java 
b/src/java/org/apache/cassandra/metrics/ClientMetrics.java
index 2ac00d293c..4bedb1808b 100644
--- a/src/java/org/apache/cassandra/metrics/ClientMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ClientMetrics.java
@@ -199,7 +199,7 @@ public final class ClientMetrics
 
         CassandraReservoir ipUsageReservoir = 
ClientResourceLimits.ipUsageReservoir();
         
Metrics.register(factory.createMetricName("RequestsSizeByIpDistribution"),
-                         new CassandraHistogram(ipUsageReservoir)
+                         new OverrideHistogram(ipUsageReservoir)
         {
              public long getCount()
              {
diff --git a/src/java/org/apache/cassandra/metrics/CassandraHistogram.java 
b/src/java/org/apache/cassandra/metrics/Counter.java
similarity index 64%
copy from src/java/org/apache/cassandra/metrics/CassandraHistogram.java
copy to src/java/org/apache/cassandra/metrics/Counter.java
index 84d3df5a59..d1ce58dc9b 100644
--- a/src/java/org/apache/cassandra/metrics/CassandraHistogram.java
+++ b/src/java/org/apache/cassandra/metrics/Counter.java
@@ -18,24 +18,19 @@
 
 package org.apache.cassandra.metrics;
 
-import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Counting;
+import com.codahale.metrics.Metric;
 
-public class CassandraHistogram extends Histogram
+/**
+ * An interface which mimics {@link com.codahale.metrics.Counter} API and 
allows alternative implementations
+ */
+public interface Counter extends Metric, Counting
 {
-    final CassandraReservoir reservoir;
-    public CassandraHistogram(CassandraReservoir reservoir)
-    {
-        super(reservoir);
-        this.reservoir = reservoir;
-    }
+    void inc();
+
+    void inc(long n);
 
-    public CassandraReservoir.BucketStrategy bucketStrategy()
-    {
-        return reservoir.bucketStrategy();
-    }
+    void dec();
 
-    public long[] bucketStarts(int length)
-    {
-        return reservoir.buckets(length);
-    }
+    void dec(long n);
 }
diff --git a/src/java/org/apache/cassandra/metrics/CassandraHistogram.java 
b/src/java/org/apache/cassandra/metrics/Meter.java
similarity index 63%
copy from src/java/org/apache/cassandra/metrics/CassandraHistogram.java
copy to src/java/org/apache/cassandra/metrics/Meter.java
index 84d3df5a59..cf4ef4e035 100644
--- a/src/java/org/apache/cassandra/metrics/CassandraHistogram.java
+++ b/src/java/org/apache/cassandra/metrics/Meter.java
@@ -18,24 +18,13 @@
 
 package org.apache.cassandra.metrics;
 
-import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Metered;
 
-public class CassandraHistogram extends Histogram
+/**
+ * An interface which mimics {@link com.codahale.metrics.Meter} API and allows 
alternative implementations
+ */
+public interface Meter extends Metered
 {
-    final CassandraReservoir reservoir;
-    public CassandraHistogram(CassandraReservoir reservoir)
-    {
-        super(reservoir);
-        this.reservoir = reservoir;
-    }
-
-    public CassandraReservoir.BucketStrategy bucketStrategy()
-    {
-        return reservoir.bucketStrategy();
-    }
-
-    public long[] bucketStarts(int length)
-    {
-        return reservoir.buckets(length);
-    }
+    void mark(long n);
+    void mark();
 }
diff --git a/src/java/org/apache/cassandra/metrics/CassandraHistogram.java 
b/src/java/org/apache/cassandra/metrics/MetricClock.java
similarity index 66%
copy from src/java/org/apache/cassandra/metrics/CassandraHistogram.java
copy to src/java/org/apache/cassandra/metrics/MetricClock.java
index 84d3df5a59..4bbe893168 100644
--- a/src/java/org/apache/cassandra/metrics/CassandraHistogram.java
+++ b/src/java/org/apache/cassandra/metrics/MetricClock.java
@@ -18,24 +18,27 @@
 
 package org.apache.cassandra.metrics;
 
-import com.codahale.metrics.Histogram;
+import java.util.concurrent.TimeUnit;
 
-public class CassandraHistogram extends Histogram
+import org.apache.cassandra.utils.Clock;
+
+public class MetricClock extends com.codahale.metrics.Clock
 {
-    final CassandraReservoir reservoir;
-    public CassandraHistogram(CassandraReservoir reservoir)
+    @Override
+    public long getTick()
     {
-        super(reservoir);
-        this.reservoir = reservoir;
+        return Clock.Global.nanoTime();
     }
 
-    public CassandraReservoir.BucketStrategy bucketStrategy()
+    public TimeUnit getTickUnit()
     {
-        return reservoir.bucketStrategy();
+        return TimeUnit.NANOSECONDS;
     }
 
-    public long[] bucketStarts(int length)
+    private static final MetricClock DEFAULT = new MetricClock();
+
+    public static MetricClock defaultClock()
     {
-        return reservoir.buckets(length);
+        return DEFAULT;
     }
 }
diff --git a/src/java/org/apache/cassandra/metrics/OverrideHistogram.java 
b/src/java/org/apache/cassandra/metrics/OverrideHistogram.java
index 36085433b7..2553d5e01f 100644
--- a/src/java/org/apache/cassandra/metrics/OverrideHistogram.java
+++ b/src/java/org/apache/cassandra/metrics/OverrideHistogram.java
@@ -21,7 +21,7 @@ package org.apache.cassandra.metrics;
 import com.codahale.metrics.Snapshot;
 import org.agrona.UnsafeAccess;
 
-public abstract class OverrideHistogram extends CassandraHistogram
+public abstract class OverrideHistogram extends com.codahale.metrics.Histogram
 {
     private static final CassandraReservoir NO_RESERVOIR = new 
CassandraReservoir() {
         @Override public Snapshot getPercentileSnapshot() { return null; }
@@ -34,10 +34,10 @@ public abstract class OverrideHistogram extends 
CassandraHistogram
 
     protected OverrideHistogram()
     {
-        super(NO_RESERVOIR);
+        this(NO_RESERVOIR);
         try
         {
-            UnsafeAccess.UNSAFE.putObject(this, 
UnsafeAccess.UNSAFE.objectFieldOffset(CassandraHistogram.class.getDeclaredField("count")),
 null);
+            UnsafeAccess.UNSAFE.putObject(this, 
UnsafeAccess.UNSAFE.objectFieldOffset(com.codahale.metrics.Histogram.class.getDeclaredField("count")),
 null);
         }
         catch (Throwable t)
         {
@@ -45,6 +45,20 @@ public abstract class OverrideHistogram extends 
CassandraHistogram
         }
     }
 
-    public abstract CassandraReservoir.BucketStrategy bucketStrategy();
-    public abstract long[] bucketStarts(int length);
+    final CassandraReservoir reservoir;
+    protected OverrideHistogram(CassandraReservoir reservoir)
+    {
+        super(reservoir);
+        this.reservoir = reservoir;
+    }
+
+    public CassandraReservoir.BucketStrategy bucketStrategy()
+    {
+        return reservoir.bucketStrategy();
+    }
+
+    public long[] bucketStarts(int length)
+    {
+        return reservoir.buckets(length);
+    }
 }
diff --git a/src/java/org/apache/cassandra/metrics/SnapshottingTimer.java 
b/src/java/org/apache/cassandra/metrics/SnapshottingTimer.java
index b15b9c8b07..d8986638d9 100644
--- a/src/java/org/apache/cassandra/metrics/SnapshottingTimer.java
+++ b/src/java/org/apache/cassandra/metrics/SnapshottingTimer.java
@@ -18,20 +18,18 @@
 
 package org.apache.cassandra.metrics;
 
-import com.codahale.metrics.Clock;
 import com.codahale.metrics.Snapshot;
-import com.codahale.metrics.Timer;
 
-public class SnapshottingTimer extends Timer
+public class SnapshottingTimer extends ThreadLocalTimer
 {
     private final CassandraReservoir reservoir;
     
     public SnapshottingTimer(CassandraReservoir reservoir)
     {
-        this(reservoir, Clock.defaultClock());
+        this(reservoir, MetricClock.defaultClock());
     }
 
-    public SnapshottingTimer(CassandraReservoir reservoir, Clock clock)
+    public SnapshottingTimer(CassandraReservoir reservoir, MetricClock clock)
     {
         super(reservoir, clock);
         this.reservoir = reservoir;
diff --git a/src/java/org/apache/cassandra/metrics/StorageMetrics.java 
b/src/java/org/apache/cassandra/metrics/StorageMetrics.java
index 1cd65c0896..24a1a6994e 100644
--- a/src/java/org/apache/cassandra/metrics/StorageMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/StorageMetrics.java
@@ -44,7 +44,8 @@ public class StorageMetrics
         createSummingGauge("UnreplicatedUncompressedLoad", metric -> 
metric.unreplicatedUncompressedLiveDiskSpaceUsed.getValue());
 
     public static final Counter uncaughtExceptions = 
Metrics.counter(factory.createMetricName("Exceptions"));
-    public static final Counter totalHintsInProgress  = 
Metrics.counter(factory.createMetricName("TotalHintsInProgress"));
+    // NOTE: we read totalHintsInProgress counter value as a part of write hot 
path, so an alternative implementation is used here
+    public static final Counter totalHintsInProgress  = 
Metrics.atomicLongCounter(factory.createMetricName("TotalHintsInProgress"));
     public static final Counter totalHints = 
Metrics.counter(factory.createMetricName("TotalHints"));
     public static final Counter repairExceptions = 
Metrics.counter(factory.createMetricName("RepairExceptions"));
     public static final Counter totalOpsForInvalidToken = 
Metrics.counter(factory.createMetricName("TotalOpsForInvalidToken"));
diff --git a/src/java/org/apache/cassandra/metrics/ThreadLocalCounter.java 
b/src/java/org/apache/cassandra/metrics/ThreadLocalCounter.java
new file mode 100644
index 0000000000..b579e0c6dd
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/ThreadLocalCounter.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.metrics;
+
+import org.apache.cassandra.utils.ReflectionUtils;
+
+/**
+ * An alternative to Dropwizard Counter which implements the same kind of API.
+ * it has more efficent inc/dec operations and consumes less memory.
+ * The counter logic is implemented using {@link ThreadLocalMetrics} 
functionality.
+ *
+ * NOTE: Dropwizard Counter is a concrete class and there is no an interface 
for Dropwizard Counter logic,
+ *   so we have to create an alternative hierarchy.
+*/
+public class ThreadLocalCounter extends com.codahale.metrics.Counter 
implements Counter
+{
+    private final int metricId;
+
+    ThreadLocalCounter(int metricId)
+    {
+        this.metricId = metricId;
+        ThreadLocalMetrics.destroyWhenUnreachable(this, metricId);
+        ReflectionUtils.setFieldToNull(this, 
com.codahale.metrics.Counter.class, "count"); // reduce metrics memory footprint
+    }
+
+    public ThreadLocalCounter()
+    {
+        this(ThreadLocalMetrics.allocateMetricId());
+    }
+
+    @Override
+    public void inc()
+    {
+        ThreadLocalMetrics.add(metricId, 1);
+    }
+
+    @Override
+    public void inc(long n)
+    {
+        ThreadLocalMetrics.add(metricId, n);
+    }
+
+    @Override
+    public void dec()
+    {
+        ThreadLocalMetrics.add(metricId, -1);
+    }
+
+    @Override
+    public void dec(long n)
+    {
+        ThreadLocalMetrics.add(metricId, -n);
+    }
+
+    @Override
+    public long getCount()
+    {
+        return ThreadLocalMetrics.getCount(metricId);
+    }
+}
diff --git a/src/java/org/apache/cassandra/metrics/ThreadLocalHistogram.java 
b/src/java/org/apache/cassandra/metrics/ThreadLocalHistogram.java
new file mode 100644
index 0000000000..9b32361d61
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/ThreadLocalHistogram.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.metrics;
+
+import com.codahale.metrics.Snapshot;
+
+/**
+ * An alternative to Dropwizard Histogram which implements the same kind of 
API.
+ * it has more efficent counting operations and consumes less memory.
+ * The counter logic is implemented using {@link ThreadLocalMetrics} 
functionality.
+ *
+ * NOTE: Dropwizard Histogram is a concrete class and there is no an interface 
for Dropwizard Histogram logic,
+ *   so we have to create an alternative API hierarchy.
+ */
+public class ThreadLocalHistogram extends OverrideHistogram
+{
+    private final int countMetricId;
+
+    /**
+     * Creates a new {@link ThreadLocalHistogram} with the given reservoir.
+     *
+     * @param reservoir the reservoir to create a histogram from
+     */
+    public ThreadLocalHistogram(CassandraReservoir reservoir)
+    {
+        super(reservoir);
+        this.countMetricId = ThreadLocalMetrics.allocateMetricId();
+        ThreadLocalMetrics.destroyWhenUnreachable(this, countMetricId);
+    }
+
+    /**
+     * Adds a recorded value.
+     *
+     * @param value the length of the value
+     */
+    public void update(int value)
+    {
+        update((long) value);
+    }
+
+    /**
+     * Adds a recorded value.
+     *
+     * @param value the length of the value
+     */
+    public void update(long value)
+    {
+        ThreadLocalMetrics.add(countMetricId, 1);
+        reservoir.update(value);
+    }
+
+    /**
+     * Returns the number of values recorded.
+     *
+     * @return the number of values recorded
+     */
+    @Override
+    public long getCount()
+    {
+        return ThreadLocalMetrics.getCount(countMetricId);
+    }
+
+    public void reset()
+    {
+        ThreadLocalMetrics.getCountAndReset(countMetricId);
+    }
+
+    @Override
+    public Snapshot getSnapshot()
+    {
+        return reservoir.getSnapshot();
+    }
+}
diff --git a/src/java/org/apache/cassandra/metrics/ThreadLocalMeter.java 
b/src/java/org/apache/cassandra/metrics/ThreadLocalMeter.java
new file mode 100644
index 0000000000..2af9daf47c
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/ThreadLocalMeter.java
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.metrics;
+
+import java.lang.ref.WeakReference;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import com.codahale.metrics.Clock;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.utils.MonotonicClock;
+import org.apache.cassandra.utils.ReflectionUtils;
+
+import static java.lang.Math.exp;
+
+/**
+ * An alternative to Dropwizard Meter which implements the same kind of API.
+ * it has more efficent mark operations and consumes less memory.
+ * Only exponential decaying moving average is supported for 1/5/15-minutes 
rate values.
+ * Tick logic is moved out from a mark operation and always executed in a 
background thread.
+ * For better cache locality rate values are extracted to a common non 
thread-local array
+ *  updated by a background thread in bulk.
+ * Counter logic inside is implemented using @see ThreadLocalMetrics 
functionality.
+ * NOTE: Dropwizard Meter is a class and there is no an interface for 
Dropwizard Meter logic,
+ *   so we have to create an alternative hierarchy.
+ */
+public class ThreadLocalMeter extends com.codahale.metrics.Meter implements 
Meter
+{
+    private static final int INTERVAL_SEC = 5;
+    private static final long TICK_INTERVAL_NS = 
TimeUnit.SECONDS.toNanos(INTERVAL_SEC);
+    private static final double SECONDS_PER_MINUTE = 60.0;
+
+    private static final double ONE_NS_IN_SEC = 1d / 
TimeUnit.SECONDS.toNanos(1);
+    private static final int ONE_MINUTE = 1;
+    private static final int FIVE_MINUTES = 5;
+    private static final int FIFTEEN_MINUTES = 15;
+    private static final double M1_ALPHA = 1 - exp(-INTERVAL_SEC / 
SECONDS_PER_MINUTE / ONE_MINUTE);
+    private static final double M5_ALPHA = 1 - exp(-INTERVAL_SEC / 
SECONDS_PER_MINUTE / FIVE_MINUTES);
+    private static final double M15_ALPHA = 1 - exp(-INTERVAL_SEC / 
SECONDS_PER_MINUTE / FIFTEEN_MINUTES);
+
+    // each meter instance stores rate stats using 3 consecutive cells in 
rates array
+    private static final int M1_RATE_OFFSET = 0;
+    private static final int M5_RATE_OFFSET = 1;
+    private static final int M15_RATE_OFFSET = 2;
+    private static final int RATES_COUNT = 3;
+    private static final double NON_INITIALIZED = Double.MIN_VALUE;
+
+    private static final int BACKGROUND_TICK_INTERVAL_SEC = INTERVAL_SEC;
+    private static final List<WeakReference<ThreadLocalMeter>> allMeters = new 
CopyOnWriteArrayList<>();
+
+    /**
+     * CASSANDRA-19332
+     * If ticking would reduce even Long.MAX_VALUE in the 15 minute EWMA below 
this target then don't bother
+     * ticking in a loop and instead reset all the EWMAs.
+     */
+    private static final double maxTickZeroTarget = 0.0001;
+    private static final int maxTicks;
+
+    static
+    {
+        int m3Ticks = 1;
+        double emulatedM15Rate = 0.0;
+        emulatedM15Rate = tickFifteenMinuteEWMA(emulatedM15Rate, 
Long.MAX_VALUE);
+        do
+        {
+            emulatedM15Rate = tickFifteenMinuteEWMA(emulatedM15Rate, 0);
+            m3Ticks++;
+        }
+        while (getRatePerSecond(emulatedM15Rate) > maxTickZeroTarget);
+        maxTicks = m3Ticks;
+    }
+
+    static
+    {
+        
ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(ThreadLocalMeter::tickAll,
+                                                                 
BACKGROUND_TICK_INTERVAL_SEC,
+                                                                 
BACKGROUND_TICK_INTERVAL_SEC,
+                                                                 
TimeUnit.SECONDS);
+    }
+
+    private static final Object ratesArrayGuard = new Object();
+
+    // writes should be synchonized using ratesArrayGuard
+    // 1) write during a copy when we need to extend rates array when a new 
Meter is created
+    // 2) write during a tick
+    private static volatile double[] rates = new double[RATES_COUNT * 16];
+    static final AtomicInteger rateGroupIdGenerator = new AtomicInteger();
+
+    // we recycle and reuse rate group IDs
+    // a set bit means the correspondent rate group id is available to use
+    private static final BitSet freeRateGroupIdSet = new BitSet();
+
+    private static int allocateRateGroupOffset()
+    {
+        int rateGroupId;
+        synchronized (freeRateGroupIdSet)
+        {
+            rateGroupId = freeRateGroupIdSet.nextSetBit(0);
+            if (rateGroupId >= 0)
+                freeRateGroupIdSet.clear(rateGroupId);
+        }
+        if (rateGroupId < 0)
+        {
+            rateGroupId = rateGroupIdGenerator.getAndAdd(RATES_COUNT);
+        }
+        synchronized (ratesArrayGuard)
+        {
+            if (rates.length < rateGroupId + RATES_COUNT)
+            {
+                double[] newRates = new double[rateGroupId + RATES_COUNT];
+                System.arraycopy(rates, 0, newRates, 0, rates.length);
+                rates = newRates;
+            }
+            rates[rateGroupId +  M1_RATE_OFFSET] = NON_INITIALIZED;
+            rates[rateGroupId +  M5_RATE_OFFSET] = NON_INITIALIZED;
+            rates[rateGroupId + M15_RATE_OFFSET] = NON_INITIALIZED;
+        }
+        return rateGroupId;
+    }
+
+    static double getRateValue(int offset)
+    {
+        return rates[offset];
+    }
+
+    private static void setRateValue(int offset, double value)
+    {
+        rates[offset] = value;
+    }
+
+    private final int countMetricId;
+    private final int uncountedMetricId;
+    private final int rateGroupId;
+    private final long startTime;
+    private final MonotonicClock clock;
+    private long lastTick;
+
+    public ThreadLocalMeter()
+    {
+        this(MonotonicClock.Global.approxTime);
+    }
+
+    public ThreadLocalMeter(MonotonicClock clock)
+    {
+        // movingAverages is set to null to reduce metrics memory footprint
+        super(null, Clock.defaultClock());
+        this.clock = clock;
+        this.startTime = this.clock.now();
+        this.lastTick = this.startTime;
+        this.countMetricId = ThreadLocalMetrics.allocateMetricId();
+        this.uncountedMetricId = ThreadLocalMetrics.allocateMetricId();
+        this.rateGroupId = allocateRateGroupOffset();
+        allMeters.add(new WeakReference<>(this));
+        ThreadLocalMetrics.destroyWhenUnreachable(this, new 
MeterCleaner(countMetricId, uncountedMetricId, rateGroupId));
+        ReflectionUtils.setFieldToNull(this, com.codahale.metrics.Meter.class, 
"count"); // to reduce metrics memory footprint
+    }
+
+    private static class MeterCleaner implements 
ThreadLocalMetrics.MetricCleaner
+    {
+        private final int countMetricId;
+        private final int uncountedMetricId;
+        private final int rateGroupId;
+
+        private MeterCleaner(int countMetricId, int uncountedMetricId, int 
rateGroupId)
+        {
+            this.countMetricId = countMetricId;
+            this.uncountedMetricId = uncountedMetricId;
+            this.rateGroupId = rateGroupId;
+        }
+
+        @Override
+        public void clean()
+        {
+            recycleRateGroupId(rateGroupId);
+            ThreadLocalMetrics.recycleMetricId(countMetricId);
+            ThreadLocalMetrics.recycleMetricId(uncountedMetricId);
+        }
+
+        private static void recycleRateGroupId(int rateGroupId)
+        {
+            synchronized (freeRateGroupIdSet)
+            {
+                freeRateGroupIdSet.set(rateGroupId);
+            }
+        }
+    }
+
+    /**
+     * Mark the occurrence of an event.
+     */
+    public void mark()
+    {
+        mark(1);
+    }
+
+    /**
+     * Mark the occurrence of a given number of events.
+     *
+     * @param n the number of events
+     */
+    public void mark(long n)
+    {
+        // we retrieve the context once here to reduce thread local lookup 
overheads
+        ThreadLocalMetrics context = ThreadLocalMetrics.get();
+        context.addNonStatic(countMetricId, n);
+        context.addNonStatic(uncountedMetricId, n);
+    }
+
+    @Override
+    public long getCount()
+    {
+        return ThreadLocalMetrics.getCount(countMetricId);
+    }
+
+    @Override
+    public double getFifteenMinuteRate()
+    {
+        return getRatePerSecond(getRateValue(rateGroupId + M15_RATE_OFFSET));
+    }
+
+    @Override
+    public double getFiveMinuteRate()
+    {
+        return getRatePerSecond(getRateValue(rateGroupId + M5_RATE_OFFSET));
+    }
+
+    @Override
+    public double getOneMinuteRate()
+    {
+        return getRatePerSecond(getRateValue(rateGroupId + M1_RATE_OFFSET));
+    }
+
+    @Override
+    public double getMeanRate()
+    {
+        long count = getCount();
+        if (count == 0)
+        {
+            return 0.0;
+        }
+        else
+        {
+            long elapsed = clock.now() - startTime;
+            return count / (elapsed * ONE_NS_IN_SEC);
+        }
+    }
+
+    @VisibleForTesting
+    static void tickAll()
+    {
+            List<WeakReference<ThreadLocalMeter>> emptyRefsToRemove = null;
+            for (WeakReference<ThreadLocalMeter> threadLocalMeterRef : 
allMeters)
+            {
+                ThreadLocalMeter meter = threadLocalMeterRef.get();
+                if (meter != null)
+                {
+                    meter.tickIfNessesary();
+                }
+                else
+                {
+                    if (emptyRefsToRemove == null)
+                        emptyRefsToRemove = new ArrayList<>();
+                    emptyRefsToRemove.add(threadLocalMeterRef);
+                }
+            }
+            if (emptyRefsToRemove != null)
+                allMeters.removeAll(emptyRefsToRemove);
+
+    }
+
+    private void tickIfNessesary()
+    {
+        long newTick = clock.now();
+        long age = newTick - lastTick;
+        if (age > TICK_INTERVAL_NS)
+        {
+            lastTick = newTick - age % TICK_INTERVAL_NS;
+            long requiredTicks = age / TICK_INTERVAL_NS;
+            tick(requiredTicks);
+        }
+    }
+    private void tick(long requiredTicks)
+    {
+        synchronized (ratesArrayGuard)
+        {
+            if (requiredTicks >= maxTicks)
+            {
+                reset();
+            }
+            else if (requiredTicks > 0)
+            {
+                long count = 
ThreadLocalMetrics.getCountAndReset(uncountedMetricId);
+                for (long i = 0; i < requiredTicks; i++)
+                {
+                    int m1Offset  = rateGroupId +  M1_RATE_OFFSET;
+                    int m5Offset  = rateGroupId +  M5_RATE_OFFSET;
+                    int m15Offset = rateGroupId + M15_RATE_OFFSET;
+                    double m1Rate  = getRateValue(m1Offset);
+                    double m5Rate  = getRateValue(m5Offset);
+                    double m15Rate = getRateValue(m15Offset);
+                    setRateValue(m1Offset,  tickOneMinuteEWMA(m1Rate, count));
+                    setRateValue(m5Offset,  tickFiveMinuteEWMA(m5Rate, count));
+                    setRateValue(m15Offset, tickFifteenMinuteEWMA(m15Rate, 
count));
+                    count = 0;
+                }
+            }
+        }
+    }
+
+    private static double tickOneMinuteEWMA(double oldRate, long count)
+    {
+        return tick(M1_ALPHA, oldRate, count);
+    }
+
+    private static double tickFiveMinuteEWMA(double oldRate, long count)
+    {
+        return tick(M5_ALPHA, oldRate, count);
+    }
+
+    private static double tickFifteenMinuteEWMA(double oldRate, long count)
+    {
+        return tick(M15_ALPHA, oldRate, count);
+    }
+
+    private static double tick(double alpha, double oldRate, long count)
+    {
+        double instantRate = (double) count / TICK_INTERVAL_NS;
+        if (oldRate != NON_INITIALIZED)
+            return oldRate + alpha * (instantRate - oldRate);
+        else // init
+            return instantRate;
+    }
+    private static double getRatePerSecond(double rate)
+    {
+        if (rate == NON_INITIALIZED)
+            rate = 0.0;
+        return rate * (double) TimeUnit.SECONDS.toNanos(1L);
+    }
+
+    /**
+     * Set the rate to the smallest possible positive value. Used to avoid 
calling tick a large number of times.
+     */
+    private void reset()
+    {
+        ThreadLocalMetrics.getCountAndReset(uncountedMetricId);
+        setRateValue(rateGroupId +  M1_RATE_OFFSET, Double.MIN_NORMAL);
+        setRateValue(rateGroupId +  M5_RATE_OFFSET, Double.MIN_NORMAL);
+        setRateValue(rateGroupId + M15_RATE_OFFSET, Double.MIN_NORMAL);
+    }
+
+    @VisibleForTesting
+    static int getTickingMetersCount()
+    {
+        return allMeters.size();
+    }
+}
diff --git a/src/java/org/apache/cassandra/metrics/ThreadLocalMetrics.java 
b/src/java/org/apache/cassandra/metrics/ThreadLocalMetrics.java
new file mode 100644
index 0000000000..6f4afe6677
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/ThreadLocalMetrics.java
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.metrics;
+
+import java.lang.ref.PhantomReference;
+import java.lang.ref.ReferenceQueue;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLongArray;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import io.netty.util.concurrent.FastThreadLocal;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.concurrent.Shutdownable;
+
+import static com.google.common.collect.ImmutableList.of;
+import static 
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
+import static 
org.apache.cassandra.concurrent.InfiniteLoopExecutor.SimulatorSafe.UNSAFE;
+import static org.apache.cassandra.utils.ExecutorUtils.shutdownAndWait;
+
+/**
+ * A thread-local counter implementation designed to use in metrics as an 
alternative to LongAdder used by Dropwizard metrics.
+ * This implementation has reduced write (increment) CPU usage costs in 
exchange for a higher read cost.
+ * We keep and increment parts of a counter locally for each thread.
+ * To reduce memory footprint per counter they are grouped together to a 
long[] array for each thread.
+ * A position of a counter value is the same for every thread for the same 
counter id.
+ * Piggyback volatile visibility is expected for readers who execute getCount 
method to see recent writes to thread local arrays.
+ * If a metric is not used anymore the position in the array is reused. 
Phantom references are used to track aliveness of metric users.
+ * When a thread died the counter values accumulated by it are transfered to a 
shared summaryValues collection.
+ * Threads death is tracked using 2 approaches: FastThreadLocal.onRemoval 
callback and phantom references to Thread objects.
+ */
+public class ThreadLocalMetrics
+{
+    private static final int INITIAL_COUNTERS_CAPACITY = 16;
+
+    static final AtomicInteger idGenerator = new AtomicInteger();
+
+    private static final Object freeMetricIdSetGuard = new Object();
+
+    @VisibleForTesting
+    static final BitSet freeMetricIdSet = new BitSet();
+
+    private static final List<ThreadLocalMetrics> allThreadLocalMetrics = new 
CopyOnWriteArrayList<>();
+
+    /* the lock is used to coordinate the threads which:
+     * 1) transfer values from a dead thread to summaryValues
+     * 2) calculate a getCount value.
+     * Using this lock we want to avoid
+     *  a value lost while moving it in getCount
+     *  as well as a double-counting
+     */
+    private static final ReadWriteLock summaryLock = new 
ReentrantReadWriteLock();
+
+    private static final FastThreadLocal<ThreadLocalMetrics> 
threadLocalMetricsCurrent = new FastThreadLocal<>()
+    {
+        @Override
+        protected ThreadLocalMetrics initialValue()
+        {
+            ThreadLocalMetrics result = new ThreadLocalMetrics();
+            allThreadLocalMetrics.add(result);
+            destroyWhenUnreachable(Thread.currentThread(), result::release);
+            return result;
+        }
+
+        // this method is invoked when a thread is going to finish, but it 
works only for FastThreadLocalThread
+        // so, we use phantom references for other cases
+        @Override
+        protected void onRemoval(ThreadLocalMetrics value)
+        {
+            value.release();
+        }
+    };
+
+    private static volatile AtomicLongArray summaryValues = new 
AtomicLongArray(INITIAL_COUNTERS_CAPACITY);
+
+    private static final Shutdownable cleaner;
+    private static final Set<PhantomReference<Object>> phantomReferences = 
Collections.newSetFromMap(new ConcurrentHashMap<>());
+    private static final ReferenceQueue<Object> referenceQueue = new 
ReferenceQueue<>();
+
+    static
+    {
+        cleaner = executorFactory().infiniteLoop("ThreadLocalMetrics-Cleaner", 
ThreadLocalMetrics::cleanupOneReference, UNSAFE);
+    }
+
+    // we assume that counterValues can be only extended
+    // piggyback volatile visibility is expected for readers who execute 
getCount method to see recent writes
+    private long[] counterValues = new long[INITIAL_COUNTERS_CAPACITY];
+
+    private static void cleanupOneReference() throws InterruptedException
+    {
+        Object obj = referenceQueue.remove(100);
+        if (obj instanceof MetricIdReference)
+        {
+            ((MetricIdReference) obj).release();
+            phantomReferences.remove(obj);
+        }
+        else if (obj instanceof MetricCleanerReference)
+        {
+            ((MetricCleanerReference) obj).release();
+            phantomReferences.remove(obj);
+        }
+    }
+
+    private static class MetricIdReference extends PhantomReference<Object>
+    {
+        private final int metricId;
+
+        public MetricIdReference(Object referent, ReferenceQueue<? super 
Object> q, int metricId)
+        {
+            super(referent, q);
+            this.metricId = metricId;
+        }
+
+        public void release()
+        {
+            recycleMetricId(metricId);
+        }
+    }
+
+    private static class MetricCleanerReference extends 
PhantomReference<Object>
+    {
+        private final MetricCleaner metricCleaner;
+
+        public MetricCleanerReference(Object referent, ReferenceQueue<? super 
Object> q, MetricCleaner metricCleaner)
+        {
+            super(referent, q);
+            this.metricCleaner = metricCleaner;
+        }
+
+        public void release()
+        {
+            metricCleaner.clean();
+        }
+    }
+
+    interface MetricCleaner
+    {
+        void clean();
+    }
+
+    static void destroyWhenUnreachable(Object referent, int metricId)
+    {
+        phantomReferences.add(new MetricIdReference(referent, referenceQueue, 
metricId));
+    }
+
+    static void destroyWhenUnreachable(Object referent, MetricCleaner 
metricCleaner)
+    {
+        phantomReferences.add(new MetricCleanerReference(referent, 
referenceQueue, metricCleaner));
+    }
+
+    @VisibleForTesting
+    public static void shutdownCleaner(long timeout, TimeUnit unit) throws 
InterruptedException, TimeoutException
+    {
+        shutdownAndWait(timeout, unit, of(cleaner));
+    }
+
+    private void release()
+    {
+        // Using this lock while moving we want to avoid races with readers in 
getCount
+        // such races can cause a transfered value lost or its double-counting 
by a reader
+        Lock lock = summaryLock.writeLock();
+        lock.lock();
+        try
+        {
+            // we may try to release ThreadLocalMetrics 2 times: onRemoval and 
by PhantomReference
+            // so this if check is needed to avoid a potential double release
+            if (allThreadLocalMetrics.remove(this))
+            {
+                for (int metricId = 0; metricId < counterValues.length; 
metricId++)
+                {
+                    long value = counterValues[metricId];
+                    if (value != 0)
+                        updateSummary(metricId, value);
+                }
+            }
+        }
+        finally
+        {
+            lock.unlock();
+        }
+    }
+
+    /**
+     * If we already have ThreadLocalMetrics instance looked up for the 
current thread
+     * we can use this method to avoid thread local lookup costs.
+     * It can be used if you need to update several counters at the same time.
+     * @param metricId metric to add a value
+     * @param n valuen to add, can be negative number as well
+     */
+    public void addNonStatic(int metricId, long n)
+    {
+        getNonStatic(metricId)[metricId] += n;
+    }
+
+    public static void add(int metricId, long n)
+    {
+        get(metricId)[metricId] += n;
+    }
+
+    private static long getCount(int metricId, boolean resetToZero)
+    {
+        long result;
+        Lock readLock = summaryLock.readLock();
+        readLock.lock();
+        try
+        {
+            result = getSummaryValue(metricId);
+            for (ThreadLocalMetrics threadLocalMetrics : allThreadLocalMetrics)
+            {
+                long count = 0;
+                long[] currentCounterValues = threadLocalMetrics.counterValues;
+                // currentCounterValues is extended for a thread when a value 
for metricId is reported in the thread
+                if (metricId < currentCounterValues.length)
+                    count = currentCounterValues[metricId];
+                result += count;
+            }
+            if (resetToZero)
+                updateSummary(metricId, -result); // compensative reset 
without writing to thread local values
+        }
+        finally
+        {
+            readLock.unlock();
+        }
+        return result;
+    }
+
+    // must be executed under summaryLock
+    private static long getSummaryValue(int metricId)
+    {
+        return summaryValues.get(metricId);
+    }
+
+    // must be executed under summaryLock
+    private static void updateSummary(int metricId, long value)
+    {
+        summaryValues.getAndAdd(metricId, value);
+    }
+
+    public static long getCount(int metricId)
+    {
+        return getCount(metricId, false);
+    }
+
+    public static long getCountAndReset(int metricId)
+    {
+        return getCount(metricId, true);
+    }
+
+    public static ThreadLocalMetrics get() {
+        return threadLocalMetricsCurrent.get();
+    }
+
+    private static long[] get(int metricId)
+    {
+        ThreadLocalMetrics threadLocalMetrics = ThreadLocalMetrics.get();
+        return threadLocalMetrics.getNonStatic(metricId);
+    }
+
+    private long[] getNonStatic(int metricId)
+    {
+        long[] currentCounterValues = counterValues;
+        if (metricId < currentCounterValues.length)
+            return currentCounterValues;
+
+        long[] newCounterValues = new long[calculateNewCapacity(metricId)];
+        // to avoid a race condition with a metric value reset within 
recycleMetricId logic
+        synchronized (this)
+        {
+            System.arraycopy(currentCounterValues, 0, newCounterValues, 0, 
currentCounterValues.length);
+            counterValues = newCounterValues;
+            return newCounterValues;
+        }
+    }
+
+    private static int calculateNewCapacity(int metricId)
+    {
+        return Math.max(metricId + 1, (int)(metricId * 1.1) );
+    }
+
+    static int allocateMetricId()
+    {
+        int metricId;
+        synchronized (freeMetricIdSetGuard)
+        {
+            metricId = freeMetricIdSet.nextSetBit(0);
+            if (metricId >= 0)
+                freeMetricIdSet.clear(metricId);
+        }
+        if (metricId < 0)
+            metricId = idGenerator.getAndIncrement();
+
+        if (metricId >= summaryValues.length()) // double-checked locking
+        {
+            Lock lock = summaryLock.writeLock();
+            lock.lock();
+            try
+            {
+                if (metricId >= summaryValues.length())
+                {
+                    AtomicLongArray newSummaryValues = new 
AtomicLongArray(calculateNewCapacity(metricId));
+                    for (int i = 0; i < summaryValues.length(); i++)
+                        newSummaryValues.set(i, summaryValues.get(i));
+                    summaryValues = newSummaryValues;
+                }
+            }
+            finally
+            {
+                lock.unlock();
+            }
+        }
+        return metricId;
+    }
+
+    static void recycleMetricId(int metricId)
+    {
+        // we use lock here to avoid potential issues when a metric is 
releasing and a thread is detected as dead at the same time
+        // in this case we may clean a summary value and later the thread 
removal logic may re-add a non-zero summary value
+        Lock lock = summaryLock.writeLock();
+        lock.lock();
+        try
+        {
+            for (ThreadLocalMetrics threadLocalMetrics : allThreadLocalMetrics)
+            {
+                // to avoid a race condition with counterValues array 
extension by a metric updating thread within getNonStatic
+                synchronized (threadLocalMetrics)
+                {
+                    long[] currentCounterValues = 
threadLocalMetrics.counterValues;
+                    if (metricId < currentCounterValues.length)
+                        currentCounterValues[metricId] = 0;
+                }
+            }
+            summaryValues.set(metricId, 0);
+        }
+        finally
+        {
+            lock.unlock();
+        }
+
+        // there's no an obvious happens-before relation between 
currentCounterValues[metricId] = 0 write we just did
+        // and an initial read of the entry by a thread which updates the 
reused metric
+        // as a workaround we introduce a delay in recyling to provide the 
write visibility in practice
+        //  even if it is not formally guaranteed by the JMM
+        ScheduledExecutors.scheduledTasks.schedule(() -> {
+            synchronized (freeMetricIdSetGuard)
+            {
+                freeMetricIdSet.set(metricId);
+            }
+        }, 5, TimeUnit.SECONDS);
+    }
+
+    @VisibleForTesting
+    static int getAllocatedMetricsCount()
+    {
+        int freeCount;
+        synchronized (freeMetricIdSetGuard)
+        {
+            freeCount = freeMetricIdSet.cardinality();
+        }
+        return idGenerator.get() - freeCount;
+    }
+
+    @VisibleForTesting
+    static int getThreadLocalMetricsObjectsCount()
+    {
+        return allThreadLocalMetrics.size();
+    }
+}
diff --git a/src/java/org/apache/cassandra/metrics/ThreadLocalTimer.java 
b/src/java/org/apache/cassandra/metrics/ThreadLocalTimer.java
new file mode 100644
index 0000000000..771c68b04f
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/ThreadLocalTimer.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.metrics;
+
+import java.time.Duration;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import com.codahale.metrics.ExponentiallyDecayingReservoir;
+import com.codahale.metrics.Reservoir;
+import com.codahale.metrics.Snapshot;
+
+/**
+ * An alternative to Dropwizard Timer which implements the same kind of API.
+ * it has more efficent latency histogram implementation and consumes less 
memory.
+ *
+ * NOTE: Dropwizard Timer is a concrete class and there is no an interface for 
Dropwizard Timer logic,
+ *   so we have to create an alternative hierarchy.
+ */
+public class ThreadLocalTimer extends com.codahale.metrics.Timer implements 
Timer
+{
+    private final Meter meter;
+    private final ThreadLocalHistogram histogram;
+    // usually we need precise clocks for timing, so we do not replace it with 
an approximate version
+    private final MetricClock clock;
+
+    /**
+     * Creates a new {@link Timer} using an {@link 
ExponentiallyDecayingReservoir} and the default
+     * {@link MetricClock}.
+     */
+    public ThreadLocalTimer()
+    {
+        this(new DecayingEstimatedHistogramReservoir());
+    }
+
+    /**
+     * Creates a new {@link Timer} that uses the given {@link Reservoir}.
+     *
+     * @param reservoir the {@link Reservoir} implementation the timer should 
use
+     */
+    public ThreadLocalTimer(CassandraReservoir reservoir)
+    {
+        this(reservoir, MetricClock.defaultClock());
+    }
+
+    /**
+     * Creates a new {@link Timer} that uses the given {@link Reservoir} and 
{@link MetricClock}.
+     *
+     * @param reservoir the {@link Reservoir} implementation the timer should 
use
+     * @param clock     the {@link MetricClock} implementation the timer 
should use
+     */
+    public ThreadLocalTimer(CassandraReservoir reservoir, MetricClock clock)
+    {
+        // the precise clock is intentionally not propagated to 
ThreadLocalMeter
+        // we do not need a precise and more expensive time within the meter
+        this(new ThreadLocalMeter(), new ThreadLocalHistogram(reservoir), 
clock);
+    }
+
+    public ThreadLocalTimer(Meter meter, ThreadLocalHistogram histogram, 
MetricClock clock)
+    {
+        // original Codahale meter and histogram are set to null to reduce 
memory footprint
+        super(null, null, clock);
+        this.meter = meter;
+        this.histogram = histogram;
+        this.clock = clock;
+    }
+
+    /**
+     * Adds a recorded duration.
+     *
+     * @param duration the length of the duration
+     * @param unit     the scale unit of {@code duration}
+     */
+    @Override
+    public void update(long duration, TimeUnit unit)
+    {
+        update(unit.toNanos(duration));
+    }
+
+    /**
+     * Adds a recorded duration.
+     *
+     * @param duration the {@link Duration} to add to the timer. Negative or 
zero value are ignored.
+     */
+    @Override
+    public void update(Duration duration)
+    {
+        update(duration.toNanos());
+    }
+
+    /**
+     * Times and records the duration of event.
+     *
+     * @param event a {@link Callable} whose {@link Callable#call()} method 
implements a process
+     *              whose duration should be timed
+     * @param <T>   the type of the value returned by {@code event}
+     * @return the value returned by {@code event}
+     * @throws Exception if {@code event} throws an {@link Exception}
+     */
+    @Override
+    public <T> T time(Callable<T> event) throws Exception
+    {
+        final long startTime = clock.getTick();
+        try
+        {
+            return event.call();
+        }
+        finally
+        {
+            update(clock.getTick() - startTime);
+        }
+    }
+
+    /**
+     * Times and records the duration of event. Should not throw exceptions, 
for that use the
+     * {@link #time(Callable)} method.
+     *
+     * @param event a {@link Supplier} whose {@link Supplier#get()} method 
implements a process
+     *              whose duration should be timed
+     * @param <T>   the type of the value returned by {@code event}
+     * @return the value returned by {@code event}
+     */
+    @Override
+    public <T> T timeSupplier(Supplier<T> event)
+    {
+        final long startTime = clock.getTick();
+        try
+        {
+            return event.get();
+        }
+        finally
+        {
+            update(clock.getTick() - startTime);
+        }
+    }
+
+    /**
+     * Times and records the duration of event.
+     *
+     * @param event a {@link Runnable} whose {@link Runnable#run()} method 
implements a process
+     *              whose duration should be timed
+     */
+    @Override
+    public void time(Runnable event)
+    {
+        final long startTime = clock.getTick();
+        try
+        {
+            event.run();
+        }
+        finally
+        {
+            update(clock.getTick() - startTime);
+        }
+    }
+
+    @Override
+    public Timer.Context startTime()
+    {
+        return new Timer.Context(this, clock);
+    }
+
+    @Override
+    public long getCount()
+    {
+        return histogram.getCount();
+    }
+
+    @Override
+    public double getFifteenMinuteRate()
+    {
+        return meter.getFifteenMinuteRate();
+    }
+
+    @Override
+    public double getFiveMinuteRate()
+    {
+        return meter.getFiveMinuteRate();
+    }
+
+    @Override
+    public double getMeanRate()
+    {
+        return meter.getMeanRate();
+    }
+
+    @Override
+    public double getOneMinuteRate()
+    {
+        return meter.getOneMinuteRate();
+    }
+
+    @Override
+    public Snapshot getSnapshot()
+    {
+        return histogram.getSnapshot();
+    }
+
+    private void update(long duration)
+    {
+        if (duration >= 0)
+        {
+            histogram.update(duration);
+            meter.mark();
+        }
+    }
+}
diff --git a/src/java/org/apache/cassandra/metrics/Timer.java 
b/src/java/org/apache/cassandra/metrics/Timer.java
new file mode 100644
index 0000000000..ed764c03c9
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/Timer.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.metrics;
+
+import java.time.Duration;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import com.codahale.metrics.Metered;
+import com.codahale.metrics.Sampling;
+
+/**
+ * An interface which mimics {@link com.codahale.metrics.Timer} API and allows 
alternative implementations
+ */
+public interface Timer extends Metered, Sampling
+{
+    /**
+     * A timing context.
+     */
+    class Context implements AutoCloseable
+    {
+        private final Timer timer;
+        private final MetricClock clock;
+        private final long startTime;
+
+        Context(Timer timer, MetricClock clock)
+        {
+            this.timer = timer;
+            this.clock = clock;
+            this.startTime = clock.getTick();
+        }
+
+        /**
+         * Updates the timer with the difference between current and start 
time. Call to this method will
+         * not reset the start time. Multiple calls result in multiple updates.
+         *
+         * @return the elapsed time in nanoseconds
+         */
+        public long stop()
+        {
+            final long elapsed = clock.getTick() - startTime;
+            timer.update(elapsed, clock.getTickUnit());
+            return elapsed;
+        }
+
+        /**
+         * Equivalent to calling {@link #stop()}.
+         */
+        @Override
+        public void close()
+        {
+            stop();
+        }
+    }
+
+    void update(long duration, TimeUnit unit);
+
+    void update(Duration duration);
+
+    <T> T time(Callable<T> event) throws Exception;
+
+    <T> T timeSupplier(Supplier<T> event);
+
+    void time(Runnable event);
+
+    /* we have to implement another method instead of time() due to 2 reasons:
+     * 1) com.codahale.metrics.Timer.Context cannot be inhereted - it has only 
a package-private constructor
+     * 2) we want to avoid direct dependency to 
com.codahale.metrics.Timer.Context in other Cassandra classes
+     */
+    Context startTime();
+}
diff --git a/src/java/org/apache/cassandra/utils/ReflectionUtils.java 
b/src/java/org/apache/cassandra/utils/ReflectionUtils.java
index 289b89beb2..6f661ed396 100644
--- a/src/java/org/apache/cassandra/utils/ReflectionUtils.java
+++ b/src/java/org/apache/cassandra/utils/ReflectionUtils.java
@@ -105,4 +105,22 @@ public class ReflectionUtils
             throw new RuntimeException(String.format("Could not clear map 
field %s in class %s", mapName, clazz), ex);
         }
     }
+
+    public static <T> void setFieldToNull(T object, Class<T> declaringClass, 
String fieldName)
+    {
+        try
+        {
+            Field field = declaringClass.getDeclaredField(fieldName);
+            field.setAccessible(true);
+            field.set(object, null);
+        }
+        catch (NoSuchFieldException e)
+        {
+            throw new RuntimeException(String.format("Could not find field %s 
in %s", fieldName, object.getClass()));
+        }
+        catch (IllegalAccessException e)
+        {
+            throw new RuntimeException(String.format("Could not set field %s 
in %s", fieldName, object.getClass()), e);
+        }
+    }
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java 
b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 70743f7b06..6147a3fdd3 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -116,6 +116,7 @@ import org.apache.cassandra.io.util.PathUtils;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.metrics.CassandraMetricsRegistry;
 import org.apache.cassandra.metrics.Sampler;
+import org.apache.cassandra.metrics.ThreadLocalMetrics;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.NoPayload;
@@ -1009,7 +1010,8 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
                                 () -> EpochAwareDebounce.instance.close(),
                                 SnapshotManager.instance::close,
                                 () -> 
IndexStatusManager.instance.shutdownAndWait(1L, MINUTES),
-                                DiskErrorsHandlerService::close
+                                DiskErrorsHandlerService::close,
+                                () -> ThreadLocalMetrics.shutdownCleaner(1L, 
MINUTES)
             );
 
             internodeMessagingStarted = false;
diff --git 
a/test/microbench/org/apache/cassandra/test/microbench/MetersBench.java 
b/test/microbench/org/apache/cassandra/test/microbench/MetersBench.java
new file mode 100644
index 0000000000..5afef15dd1
--- /dev/null
+++ b/test/microbench/org/apache/cassandra/test/microbench/MetersBench.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.test.microbench;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLongArray;
+
+import org.apache.cassandra.metrics.Meter;
+import org.apache.cassandra.metrics.ThreadLocalMeter;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+@Warmup(iterations = 4, time = 10, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 8, time = 10, timeUnit = TimeUnit.SECONDS)
+@Fork(value = 2,
+jvmArgsAppend = { "-Djmh.executor=CUSTOM", 
"-Djmh.executor.class=org.apache.cassandra.test.microbench.FastThreadExecutor"})
+@Threads(4)
+@State(Scope.Benchmark)
+public class MetersBench
+{
+    @Param({ "ThreadLocal", "Dropwizard"})
+    private String type;
+
+    @Param({"10"})
+    private int metricsCount;
+
+    @Param({"true"})
+    private boolean polluteCpuCaches;
+
+    private List<Meter> meters;
+
+    @Setup(Level.Trial)
+    public void setup() throws Throwable
+    {
+        meters = new ArrayList<>(metricsCount);
+        for (int i = 0; i < metricsCount; i++)
+        {
+            Meter meter;
+            switch (type)
+            {
+                case "ThreadLocal":
+                    meter = new ThreadLocalMeter();
+                    break;
+                case "Dropwizard":
+                    meter = new DropwizardMeter();
+                    break;
+                default:
+                    throw new UnsupportedOperationException();
+            }
+            meters.add(meter);
+        }
+    }
+
+    private final AtomicLongArray anotherMemory = new AtomicLongArray(256 * 
1024);
+
+    @Setup(Level.Invocation)
+    public void polluteCpuCaches()
+    {
+        if (polluteCpuCaches)
+            for (int i = 0; i < anotherMemory.length(); i++)
+                anotherMemory.incrementAndGet(i);
+    }
+
+    @Benchmark
+    public void mark() {
+        for (Meter meter : meters)
+            meter.mark();
+    }
+
+    private static class DropwizardMeter implements Meter
+    {
+        private final com.codahale.metrics.Meter meter = new 
com.codahale.metrics.Meter();
+
+        @Override
+        public void mark()
+        {
+            meter.mark();
+        }
+
+        @Override
+        public void mark(long n)
+        {
+            meter.mark(n);
+        }
+
+        @Override
+        public long getCount()
+        {
+            return meter.getCount();
+        }
+
+        @Override
+        public double getFifteenMinuteRate()
+        {
+            return meter.getFifteenMinuteRate();
+        }
+
+        @Override
+        public double getFiveMinuteRate()
+        {
+            return meter.getFiveMinuteRate();
+        }
+
+        @Override
+        public double getMeanRate()
+        {
+            return meter.getMeanRate();
+        }
+
+        @Override
+        public double getOneMinuteRate()
+        {
+            return meter.getOneMinuteRate();
+        }
+    }
+}
diff --git 
a/test/microbench/org/apache/cassandra/test/microbench/ThreadLocalMetricsBench.java
 
b/test/microbench/org/apache/cassandra/test/microbench/ThreadLocalMetricsBench.java
new file mode 100644
index 0000000000..5a3e80b1f7
--- /dev/null
+++ 
b/test/microbench/org/apache/cassandra/test/microbench/ThreadLocalMetricsBench.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.test.microbench;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLongArray;
+import java.util.concurrent.atomic.LongAdder;
+
+import org.apache.cassandra.metrics.Counter;
+import org.apache.cassandra.metrics.ThreadLocalCounter;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+@Warmup(iterations = 4, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 8, time = 2, timeUnit = TimeUnit.SECONDS)
+@Fork(value = 2,
+      jvmArgsAppend = { "-Djmh.executor=CUSTOM", 
"-Djmh.executor.class=org.apache.cassandra.test.microbench.FastThreadExecutor"})
+@Threads(4)
+@State(Scope.Benchmark)
+public class ThreadLocalMetricsBench
+{
+    @Param({"LongAdder", "PlainArray"})
+    private String type;
+
+    @Param({"50", "100"})
+    private int metricsCount;
+
+    private List<Counter> counters;
+
+
+    @Setup(Level.Trial)
+    public void setup() throws Throwable
+    {
+        counters = new ArrayList<>(metricsCount);
+        for (int i = 0; i < metricsCount; i++)
+        {
+            Counter counter;
+            switch (type)
+            {
+                case "LongAdder":
+                    counter = new LongAdderCounter();
+                    break;
+                case "PlainArray":
+                    counter = new ThreadLocalCounter();
+                    break;
+                default:
+                    throw new UnsupportedOperationException();
+            }
+            counters.add(counter);
+        }
+    }
+
+    private final AtomicLongArray anotherMemory = new AtomicLongArray(256 * 
1024);
+
+    @Setup(Level.Invocation)
+    public void polluteCpuCaches()
+    {
+        for (int i = 0; i < anotherMemory.length(); i++)
+            anotherMemory.incrementAndGet(i);
+    }
+
+    @Benchmark
+    public void increment()
+    {
+        for (Counter counter : counters)
+            counter.inc();
+    }
+
+    public static class LongAdderCounter implements Counter
+    {
+        private final LongAdder counter = new LongAdder();
+
+        @Override
+        public void inc()
+        {
+            counter.increment();
+        }
+
+        @Override
+        public void inc(long n)
+        {
+            counter.add(n);
+        }
+
+        @Override
+        public void dec()
+        {
+            counter.decrement();
+        }
+
+        @Override
+        public void dec(long n)
+        {
+            counter.add(-n);
+        }
+
+        @Override
+        public long getCount()
+        {
+            return counter.sum();
+        }
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/metrics/JmxVirtualTableMetricsTest.java 
b/test/unit/org/apache/cassandra/metrics/JmxVirtualTableMetricsTest.java
index cd63f03a91..9f79d656c1 100644
--- a/test/unit/org/apache/cassandra/metrics/JmxVirtualTableMetricsTest.java
+++ b/test/unit/org/apache/cassandra/metrics/JmxVirtualTableMetricsTest.java
@@ -67,7 +67,7 @@ public class JmxVirtualTableMetricsTest extends CQLTester
 
         metricToNameMap.put(MetricType.METER, registry.meter("meter"));
         metricToNameMap.put(MetricType.COUNTER, registry.counter("counter"));
-        metricToNameMap.put(MetricType.HISTOGRAM, 
registry.histogram("histogram", () -> new CassandraHistogram(new 
DecayingEstimatedHistogramReservoir(true))));
+        metricToNameMap.put(MetricType.HISTOGRAM, 
registry.histogram("histogram", () -> new ClearableHistogram(new 
DecayingEstimatedHistogramReservoir(true))));
         metricToNameMap.put(MetricType.TIMER, registry.timer("timer"));
         metricToNameMap.put(MetricType.GAUGE, registry.gauge("gauge", () -> 
gaugeValue::get));
 
diff --git a/test/unit/org/apache/cassandra/metrics/ThreadLocalCounterTest.java 
b/test/unit/org/apache/cassandra/metrics/ThreadLocalCounterTest.java
new file mode 100644
index 0000000000..ddf275a148
--- /dev/null
+++ b/test/unit/org/apache/cassandra/metrics/ThreadLocalCounterTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.metrics;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.LocalAwareExecutorPlus;
+
+import static 
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
+import static org.junit.Assert.assertEquals;
+
+public class ThreadLocalCounterTest
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ThreadLocalCounterTest.class);
+
+    @Test
+    public void testLifecycleAndMultipleInstancesCreation() throws 
InterruptedException
+    {
+        final List<List<Counter>> metricsPerIteration = new ArrayList<>();
+        int METRICS_COUNT = 50;
+        int ITERATIONS_COUNT = 100;
+        long TASKS_COUNT = 100_000;
+        int THREADS = 20;
+        boolean DESTROY_COUNTERS_AT_THE_END_OF_ITERATION = true;
+
+        for (int iteration = 0; iteration < ITERATIONS_COUNT; iteration++)
+        {
+            {
+                final List<Counter> metrics = new ArrayList<>();
+                for (int i = 0; i < METRICS_COUNT; i++)
+                    metrics.add(new ThreadLocalCounter());
+                metricsPerIteration.add(metrics);
+            }
+
+            // note: these threads are FastThreadLocalThread instances
+            // so, we recycle thread local metrics using 
io.netty.util.concurrent.FastThreadLocal.onRemoval
+            LocalAwareExecutorPlus executor = executorFactory()
+                                              .localAware()
+                                              .pooled("executor-" + iteration, 
THREADS);
+
+            for (int i = 0; i < TASKS_COUNT; i++)
+            {
+                executor.submit(() -> {
+                    for (List<Counter> metricSet : metricsPerIteration)
+                        for (Counter metric : metricSet)
+                            metric.inc();
+                });
+            }
+            boolean allIncremented = false;
+            while (!allIncremented)
+            {
+                allIncremented = true;
+                for (int metricSetId = 0; metricSetId < 
metricsPerIteration.size(); metricSetId++)
+                    for (Counter metric : metricsPerIteration.get(metricSetId))
+                        allIncremented &= TASKS_COUNT * 
(metricsPerIteration.size() - metricSetId) == metric.getCount();
+            }
+            assertEquals(THREADS, 
ThreadLocalMetrics.getThreadLocalMetricsObjectsCount());
+            executor.shutdown();
+            executor.awaitTermination(30, TimeUnit.SECONDS);
+            for (int metricSetId = 0; metricSetId < 
metricsPerIteration.size(); metricSetId++)
+                for (Counter metric : metricsPerIteration.get(metricSetId))
+                    assertEquals(TASKS_COUNT * (metricsPerIteration.size() - 
metricSetId), metric.getCount());
+
+            if (DESTROY_COUNTERS_AT_THE_END_OF_ITERATION)
+            {
+                metricsPerIteration.clear();
+            }
+
+            LOGGER.info("id generator state: {}, free IDs: {}",
+                        ThreadLocalMetrics.idGenerator.get(),
+                        ThreadLocalMetrics.freeMetricIdSet);
+            LOGGER.info("iteration completed: {} / {}", iteration + 1, 
ITERATIONS_COUNT);
+        }
+    }
+
+    @Test
+    public void testBasicOperations()
+    {
+        Counter counter = new ThreadLocalCounter();
+        counter.inc();
+        assertEquals(1, counter.getCount());
+        counter.inc(15);
+        assertEquals(1 + 15, counter.getCount());
+        counter.dec(10);
+        assertEquals(1 + 15 - 10, counter.getCount());
+        counter.dec();
+        assertEquals(1 + 15 - 10 - 1, counter.getCount());
+    }
+}
diff --git a/src/java/org/apache/cassandra/metrics/CassandraHistogram.java 
b/test/unit/org/apache/cassandra/metrics/ThreadLocalHistogramTest.java
similarity index 50%
rename from src/java/org/apache/cassandra/metrics/CassandraHistogram.java
rename to test/unit/org/apache/cassandra/metrics/ThreadLocalHistogramTest.java
index 84d3df5a59..0081629f50 100644
--- a/src/java/org/apache/cassandra/metrics/CassandraHistogram.java
+++ b/test/unit/org/apache/cassandra/metrics/ThreadLocalHistogramTest.java
@@ -18,24 +18,32 @@
 
 package org.apache.cassandra.metrics;
 
-import com.codahale.metrics.Histogram;
+import org.junit.Assert;
+import org.junit.Test;
 
-public class CassandraHistogram extends Histogram
+public class ThreadLocalHistogramTest
 {
-    final CassandraReservoir reservoir;
-    public CassandraHistogram(CassandraReservoir reservoir)
+    @Test
+    public void testBasicOperations()
     {
-        super(reservoir);
-        this.reservoir = reservoir;
+        OverrideHistogram histogram = new ThreadLocalHistogram(new 
DecayingEstimatedHistogramReservoir());
+        histogram.update(10);
+        Assert.assertEquals(1, histogram.getCount());
+        histogram.update(20);
+        Assert.assertEquals(2, histogram.getCount());
+        histogram.update(100);
+        Assert.assertEquals(3, histogram.getCount());
     }
 
-    public CassandraReservoir.BucketStrategy bucketStrategy()
+    @Test
+    public void testReset()
     {
-        return reservoir.bucketStrategy();
-    }
-
-    public long[] bucketStarts(int length)
-    {
-        return reservoir.buckets(length);
+        ClearableHistogram histogram = new ClearableHistogram(new 
DecayingEstimatedHistogramReservoir());
+        histogram.update(1);
+        histogram.update(1);
+        histogram.update(1);
+        Assert.assertEquals(3, histogram.getCount());
+        histogram.reset();
+        Assert.assertEquals(0, histogram.getCount());
     }
 }
diff --git a/test/unit/org/apache/cassandra/metrics/ThreadLocalMeterTest.java 
b/test/unit/org/apache/cassandra/metrics/ThreadLocalMeterTest.java
new file mode 100644
index 0000000000..8a467c3d6b
--- /dev/null
+++ b/test/unit/org/apache/cassandra/metrics/ThreadLocalMeterTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.metrics;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.codahale.metrics.Clock;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.utils.MonotonicClock;
+import org.apache.cassandra.utils.MonotonicClockTranslation;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.offset;
+import static org.junit.Assert.assertEquals;
+
+public class ThreadLocalMeterTest
+{
+    @Test
+    public void checkNoMark()
+    {
+        DeterministicClock clock = new DeterministicClock(0);
+        ThreadLocalMeter meter = new ThreadLocalMeter(clock);
+        com.codahale.metrics.Meter codahaleMeter = new 
com.codahale.metrics.Meter(clock);
+        clock.setTime(TimeUnit.SECONDS.toNanos(10));
+        ThreadLocalMeter.tickAll();
+        assertMeter(meter, codahaleMeter);
+    }
+
+    @Test
+    public void constantRate()
+    {
+        DeterministicClock clock = new DeterministicClock(0);
+        ThreadLocalMeter meter = new ThreadLocalMeter(clock);
+        com.codahale.metrics.Meter codahaleMeter = new 
com.codahale.metrics.Meter(clock);
+
+        long seconds = TimeUnit.MINUTES.toSeconds(15);
+        for (long i = 0; i < seconds; i++)
+        {
+            ThreadLocalMeter.tickAll();
+            meter.mark();
+            codahaleMeter.mark();
+            clock.setTime(TimeUnit.SECONDS.toNanos(i + 1));
+        }
+
+        assertMeter(meter, codahaleMeter);
+    }
+
+    @Test
+    public void marksEventsAndUpdatesRatesAndCount()
+    {
+        DeterministicClock clock = new DeterministicClock(0);
+        ThreadLocalMeter meter = new ThreadLocalMeter(clock);
+        com.codahale.metrics.Meter codahaleMeter = new 
com.codahale.metrics.Meter(clock);
+
+        clock.setTime(TimeUnit.SECONDS.toNanos(10));
+        ThreadLocalMeter.tickAll();
+        meter.mark();
+        meter.mark(2);
+        codahaleMeter.mark();
+        codahaleMeter.mark(2);
+
+        assertMeter(meter, codahaleMeter);
+    }
+
+    @Test
+    public void pseudoRandomRateSimulation()
+    {
+        Random random = new Random(1);
+        DeterministicClock clock = new DeterministicClock(0);
+        ThreadLocalMeter meter = new ThreadLocalMeter(clock);
+        com.codahale.metrics.Meter codahaleMeter = new 
com.codahale.metrics.Meter(clock);
+
+        int rounds = 10_000;
+        for (int i = 0; i < rounds; i++)
+        {
+            long n = random.nextInt();
+            ThreadLocalMeter.tickAll();
+            meter.mark(n);
+            codahaleMeter.mark(n);
+            clock.setTime(clock.now() + random.nextInt());
+        }
+        assertMeter(meter, codahaleMeter);
+    }
+
+    @Test // CASSANDRA-19332
+    public void testMaxTicks()
+    {
+        DeterministicClock clock = new DeterministicClock(0);
+        ThreadLocalMeter threadLocalMeter = new ThreadLocalMeter(clock);
+        clock.setTime(Long.MAX_VALUE);
+        threadLocalMeter.mark(Long.MAX_VALUE);
+        ThreadLocalMeter.tickAll();
+        final long secondNanos = TimeUnit.SECONDS.toNanos(1);
+        assertEquals(threadLocalMeter.getOneMinuteRate(), Double.MIN_NORMAL * 
secondNanos, 0.0);
+        assertEquals(threadLocalMeter.getFiveMinuteRate(), Double.MIN_NORMAL * 
secondNanos, 0.0);
+        assertEquals(threadLocalMeter.getFifteenMinuteRate(), 
Double.MIN_NORMAL * secondNanos, 0.0);
+    }
+
+    @Test
+    public void testAllocationAndDestroy()
+    {
+        DeterministicClock clock = new DeterministicClock(0);
+        Random random = new Random(42);
+        List<MeterPair> meters = new ArrayList<>();
+        for (int i = 0; i < 50_000; i++)
+        {
+            boolean create = random.nextBoolean();
+            if (create)
+            {
+                MeterPair pair = new MeterPair();
+                pair.meter = new ThreadLocalMeter(clock);
+                pair.codahaleMeter = new com.codahale.metrics.Meter(clock);
+                meters.add(pair);
+            }
+            else if (!meters.isEmpty())
+            {
+               int meterToRemove = random.nextInt(meters.size());
+               meters.remove(meterToRemove);
+            }
+            ThreadLocalMeter.tickAll();
+            for (MeterPair meterPair : meters)
+            {
+                meterPair.meter.mark();
+                meterPair.codahaleMeter.mark();
+                assertMeter(meterPair.meter, meterPair.codahaleMeter);
+            }
+            // note: Random.nextLong(long) is not available in Java 11
+            clock.setTime(clock.now() + 
random.nextInt((int)TimeUnit.SECONDS.toNanos(10)));
+        }
+
+        int NUMBER_OF_COUNTERS_PER_METER = 2;
+        Util.spinAssertEquals(NUMBER_OF_COUNTERS_PER_METER * meters.size(), () 
-> {
+            System.gc(); // to trigger PhantomReferences queuing and recycle 
unused Meter thread local counters
+            return ThreadLocalMetrics.getAllocatedMetricsCount();
+        }, 20);
+
+        ThreadLocalMeter.tickAll(); // to force recycling of empty weak 
references
+        Assert.assertEquals(meters.size(), 
ThreadLocalMeter.getTickingMetersCount());
+        Assert.assertEquals(1, 
ThreadLocalMetrics.getThreadLocalMetricsObjectsCount());
+
+    }
+
+    private static class MeterPair
+    {
+        ThreadLocalMeter meter;
+        com.codahale.metrics.Meter codahaleMeter;
+
+    }
+
+    private static void assertMeter(ThreadLocalMeter checkingMeter, 
com.codahale.metrics.Meter standardMeter)
+    {
+        
assertThat(checkingMeter.getCount()).isEqualTo(standardMeter.getCount());
+        
assertThat(checkingMeter.getMeanRate()).isEqualTo(standardMeter.getMeanRate(), 
offset(0.001));
+        
assertThat(checkingMeter.getOneMinuteRate()).isEqualTo(standardMeter.getOneMinuteRate(),
 offset(0.001));
+        
assertThat(checkingMeter.getFiveMinuteRate()).isEqualTo(standardMeter.getFiveMinuteRate(),
 offset(0.001));
+        
assertThat(checkingMeter.getFifteenMinuteRate()).isEqualTo(standardMeter.getFifteenMinuteRate(),
 offset(0.001));
+    }
+
+    private static class DeterministicClock extends Clock implements 
MonotonicClock
+    {
+        private volatile long tickNs;
+
+        public DeterministicClock(long initialTime)
+        {
+            tickNs = initialTime;
+        }
+
+        public void setTime(long tickNs)
+        {
+            this.tickNs = tickNs;
+        }
+
+        @Override
+        public long getTick()
+        {
+            return tickNs;
+        }
+
+        @Override
+        public long now()
+        {
+            return tickNs;
+        }
+
+        @Override
+        public long error()
+        {
+            return 0;
+        }
+
+        @Override
+        public MonotonicClockTranslation translate()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean isAfter(long instant)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean isAfter(long now, long instant)
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
+}
diff --git a/test/unit/org/apache/cassandra/tools/OfflineToolUtils.java 
b/test/unit/org/apache/cassandra/tools/OfflineToolUtils.java
index 9a5ae8e8b8..bfc3a7f5b8 100644
--- a/test/unit/org/apache/cassandra/tools/OfflineToolUtils.java
+++ b/test/unit/org/apache/cassandra/tools/OfflineToolUtils.java
@@ -78,6 +78,7 @@ public abstract class OfflineToolUtils
                        // and may still be active when we check
     "Attach Listener", // spawned in intellij IDEA
     "JNA Cleaner",     // spawned by JNA
+    "ThreadLocalMetrics-Cleaner", // spawned by 
org.apache.cassandra.metrics.ThreadLocalMetrics
     };
 
     static final String[] NON_DEFAULT_MEMTABLE_THREADS =


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to