dongjinleekr commented on a change in pull request #11586:
URL: https://github.com/apache/kafka/pull/11586#discussion_r805725430
##########
File path: clients/src/main/java/org/apache/kafka/common/network/Selector.java
##########
@@ -1305,44 +1314,62 @@ private Sensor sensor(String name, Sensor... parents) {
}
public void maybeRegisterConnectionMetrics(String connectionId) {
- if (!connectionId.isEmpty() && metricsPerConnection) {
- // if one sensor of the metrics has been registered for the
connection,
- // then all other sensors should have been registered; and
vice versa
- String nodeRequestName = "node-" + connectionId +
".requests-sent";
- Sensor nodeRequest = this.metrics.getSensor(nodeRequestName);
- if (nodeRequest == null) {
- Map<String, String> tags = new LinkedHashMap<>(metricTags);
- tags.put("node-id", "node-" + connectionId);
-
- nodeRequest = sensor(nodeRequestName);
- nodeRequest.add(createMeter(metrics,
perConnectionMetricGrpName, tags, new WindowedCount(), "request", "requests
sent"));
- MetricName metricName =
metrics.metricName("request-size-avg", perConnectionMetricGrpName, "The average
size of requests sent.", tags);
- nodeRequest.add(metricName, new Avg());
- metricName = metrics.metricName("request-size-max",
perConnectionMetricGrpName, "The maximum size of any request sent.", tags);
- nodeRequest.add(metricName, new Max());
-
- String bytesSentName = "node-" + connectionId +
".bytes-sent";
- Sensor bytesSent = sensor(bytesSentName);
- bytesSent.add(createMeter(metrics,
perConnectionMetricGrpName, tags, "outgoing-byte", "outgoing bytes"));
-
- String nodeResponseName = "node-" + connectionId +
".responses-received";
- Sensor nodeResponse = sensor(nodeResponseName);
- nodeResponse.add(createMeter(metrics,
perConnectionMetricGrpName, tags, new WindowedCount(), "response", "responses
received"));
-
- String bytesReceivedName = "node-" + connectionId +
".bytes-received";
- Sensor bytesReceive = sensor(bytesReceivedName);
- bytesReceive.add(createMeter(metrics,
perConnectionMetricGrpName, tags, "incoming-byte", "incoming bytes"));
-
- String nodeTimeName = "node-" + connectionId + ".latency";
- Sensor nodeRequestTime = sensor(nodeTimeName);
- metricName = metrics.metricName("request-latency-avg",
perConnectionMetricGrpName, tags);
- nodeRequestTime.add(metricName, new Avg());
- metricName = metrics.metricName("request-latency-max",
perConnectionMetricGrpName, tags);
- nodeRequestTime.add(metricName, new Max());
- }
+ if (!connectionId.isEmpty() && connectionMetrics != null) {
+ connectionMetrics.computeIfAbsent(connectionId, (key) -> {
+ // key: connection id
+ // value: set of sensors (currently null)
+ return perConnectionSensors(key);
+ });
}
}
+ public void maybeUnregisterConnectionMetrics(String connectionId) {
+ if (!connectionId.isEmpty() && connectionMetrics != null) {
+ connectionMetrics.computeIfPresent(connectionId, (key, value)
-> {
+ // key: connection id
+ // value: set of sensors
+ for (Sensor sensor : value) {
+ metrics.removeSensor(sensor.name());
+ }
+ return null;
+ });
+ }
+ }
+
+ private Set<Sensor> perConnectionSensors(String connectionId) {
Review comment:
So... `ConnectionMetrics` should hold all the metrics related to given
`connectionId`, similar to `GroupCoordinatorMetrics` or `SenderMetrics`. Do I
understand correctly?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]