This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch 4973-new-monitor-metrics
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/4973-new-monitor-metrics by
this push:
new 7ef6cf5f6f Uses more concurrent data structs in new monitor
7ef6cf5f6f is described below
commit 7ef6cf5f6f1d57adc3f3f7dd6e7f5df986ae4599
Author: Keith Turner <[email protected]>
AuthorDate: Fri Nov 22 20:47:30 2024 +0000
Uses more concurrent data structs in new monitor
In the new monitor code SystemInformation class there are
ConcurrentHashMaps that have non concurrent data structs for values.
This was causing problems when multiple threads accessed. Changed the
values to be concurrent. Also changed a few other map to be concurrent
just to remove use the normal HashMap import in the class.
---
.../accumulo/monitor/next/SystemInformation.java | 45 ++++++++++++----------
1 file changed, 25 insertions(+), 20 deletions(-)
diff --git
a/server/monitor/src/main/java/org/apache/accumulo/monitor/next/SystemInformation.java
b/server/monitor/src/main/java/org/apache/accumulo/monitor/next/SystemInformation.java
index a00b53b26f..a6f9f62e66 100644
---
a/server/monitor/src/main/java/org/apache/accumulo/monitor/next/SystemInformation.java
+++
b/server/monitor/src/main/java/org/apache/accumulo/monitor/next/SystemInformation.java
@@ -22,8 +22,7 @@ import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -243,13 +242,13 @@ public class SystemInformation {
}
public static class ProcessSummary {
- private long configured = 0;
- private long responded = 0;
- private Set<String> notResponded = new HashSet<>();
+ private final AtomicLong configured = new AtomicLong(0);
+ private final AtomicLong responded = new AtomicLong();
+ private final Set<String> notResponded = ConcurrentHashMap.newKeySet();
public void addResponded() {
- configured++;
- responded++;
+ configured.incrementAndGet();
+ responded.incrementAndGet();
}
public void addNotResponded(ServerId server) {
@@ -257,11 +256,11 @@ public class SystemInformation {
}
public long getConfigured() {
- return this.configured;
+ return this.configured.get();
}
public long getResponded() {
- return this.responded;
+ return this.responded.get();
}
public long getNotResponded() {
@@ -283,8 +282,8 @@ public class SystemInformation {
private final Cache<ServerId,MetricResponse> allMetrics;
- private final Set<String> resourceGroups = new HashSet<>();
- private final Set<ServerId> problemHosts = new HashSet<>();
+ private final Set<String> resourceGroups = ConcurrentHashMap.newKeySet();
+ private final Set<ServerId> problemHosts = ConcurrentHashMap.newKeySet();
private final AtomicReference<ServerId> manager = new AtomicReference<>();
private final AtomicReference<ServerId> gc = new AtomicReference<>();
@@ -312,7 +311,7 @@ public class SystemInformation {
new ConcurrentHashMap<>();
// Compaction Information
- private final Map<String,List<FMetric>> queueMetrics = new HashMap<>();
+ private final Map<String,List<FMetric>> queueMetrics = new
ConcurrentHashMap<>();
private final AtomicReference<Map<String,TExternalCompaction>>
runningCompactions =
new AtomicReference<>();
private final AtomicReference<Map<Long,String>>
runningCompactionsDurationIndex =
@@ -323,7 +322,7 @@ public class SystemInformation {
private final Map<String,List<TabletInformation>> tablets = new
ConcurrentHashMap<>();
// Deployment Overview
- private final Map<String,Map<String,ProcessSummary>> deployment = new
HashMap<>();
+ private final Map<String,Map<String,ProcessSummary>> deployment = new
ConcurrentHashMap<>();
public SystemInformation(Cache<ServerId,MetricResponse> allMetrics) {
this.allMetrics = allMetrics;
@@ -389,7 +388,9 @@ public class SystemInformation {
for (int i = 0; i < fm.tagsLength(); i++) {
FTag t = fm.tags(i);
if (t.key().equals("queue.id")) {
- queueMetrics.computeIfAbsent(t.value(), (k) -> new
ArrayList<>()).add(fm);
+ queueMetrics
+ .computeIfAbsent(t.value(), (k) ->
Collections.synchronizedList(new ArrayList<>()))
+ .add(fm);
}
}
}
@@ -402,7 +403,8 @@ public class SystemInformation {
resourceGroups.add(response.getResourceGroup());
switch (response.serverType) {
case COMPACTOR:
- compactors.computeIfAbsent(response.getResourceGroup(), (rg) -> new
HashSet<>())
+ compactors
+ .computeIfAbsent(response.getResourceGroup(), (rg) ->
ConcurrentHashMap.newKeySet())
.add(server);
updateAggregates(response, totalCompactorMetrics, rgCompactorMetrics);
break;
@@ -418,11 +420,13 @@ public class SystemInformation {
createCompactionSummary(response);
break;
case SCAN_SERVER:
- sservers.computeIfAbsent(response.getResourceGroup(), (rg) -> new
HashSet<>()).add(server);
+ sservers.computeIfAbsent(response.getResourceGroup(), (rg) ->
ConcurrentHashMap.newKeySet())
+ .add(server);
updateAggregates(response, totalSServerMetrics, rgSServerMetrics);
break;
case TABLET_SERVER:
- tservers.computeIfAbsent(response.getResourceGroup(), (rg) -> new
HashSet<>()).add(server);
+ tservers.computeIfAbsent(response.getResourceGroup(), (rg) ->
ConcurrentHashMap.newKeySet())
+ .add(server);
updateAggregates(response, totalTServerMetrics, rgTServerMetrics);
break;
default:
@@ -451,7 +455,8 @@ public class SystemInformation {
public void processTabletInformation(String tableName, TabletInformation
info) {
final SanitizedTabletInformation sti = new
SanitizedTabletInformation(info);
- tablets.computeIfAbsent(tableName, (t) -> new ArrayList<>()).add(sti);
+ tablets.computeIfAbsent(tableName, (t) -> Collections.synchronizedList(new
ArrayList<>()))
+ .add(sti);
tables.computeIfAbsent(tableName, (t) -> new
TableSummary()).addTablet(sti);
}
@@ -462,11 +467,11 @@ public class SystemInformation {
public void finish() {
// Iterate over the metrics
allMetrics.asMap().keySet().forEach(serverId -> {
- deployment.computeIfAbsent(serverId.getResourceGroup(), g -> new
HashMap<>())
+ deployment.computeIfAbsent(serverId.getResourceGroup(), g -> new
ConcurrentHashMap<>())
.computeIfAbsent(serverId.getType().name(), t -> new
ProcessSummary()).addResponded();
});
problemHosts.forEach(serverId -> {
- deployment.computeIfAbsent(serverId.getResourceGroup(), g -> new
HashMap<>())
+ deployment.computeIfAbsent(serverId.getResourceGroup(), g -> new
ConcurrentHashMap<>())
.computeIfAbsent(serverId.getType().name(), t -> new
ProcessSummary())
.addNotResponded(serverId);
});