AndrewJSchofield commented on code in PR #16488: URL: https://github.com/apache/kafka/pull/16488#discussion_r1666543803
########## core/src/main/java/kafka/server/share/SharePartitionManager.java: ########## @@ -696,4 +722,99 @@ public ShareFetchPartitionData(FetchParams fetchParams, String groupId, String m this.partitionMaxBytes = partitionMaxBytes; } } + + static class ShareGroupMetrics { + /** + * share-acknowledgement (share-acknowledgement-rate and share-acknowledgement-count) - The total number of offsets acknowledged for share groups (requests to be ack). + * record-acknowledgement (record-acknowledgement-rate and record-acknowledgement-count) - The number of records acknowledged per acknowledgement type. + * partition-load-time (partition-load-time-avg and partition-load-time-max) - The time taken to load the share partitions. + */ + + public static final String METRICS_GROUP_NAME = "share-group-metrics"; + + public static final String SHARE_ACK_SENSOR = "share-acknowledgement-sensor"; + public static final String SHARE_ACK_RATE = "share-acknowledgement-rate"; + public static final String SHARE_ACK_COUNT = "share-acknowledgement-count"; + + public static final String RECORD_ACK_SENSOR_PREFIX = "record-acknowledgement"; + public static final String RECORD_ACK_RATE = "record-acknowledgement-rate"; + public static final String RECORD_ACK_COUNT = "record-acknowledgement-count"; + public static final String ACK_TYPE = "ack-type"; + + public static final String PARTITION_LOAD_TIME_SENSOR = "partition-load-time-sensor"; + public static final String PARTITION_LOAD_TIME_AVG = "partition-load-time-avg"; + public static final String PARTITION_LOAD_TIME_MAX = "partition-load-time-max"; + + public static final Map<Byte, String> RECORD_ACKS_MAP = new HashMap<>(); + + private final Time time; + private final Sensor shareAcknowledgementSensor; + private final Map<Byte, Sensor> recordAcksSensorMap = new HashMap<>(); + private final Sensor partitionLoadTimeSensor; + + static { + RECORD_ACKS_MAP.put((byte) 1, AcknowledgeType.ACCEPT.toString()); + RECORD_ACKS_MAP.put((byte) 2, AcknowledgeType.RELEASE.toString()); + RECORD_ACKS_MAP.put((byte) 3, AcknowledgeType.REJECT.toString()); + } + + public ShareGroupMetrics(Metrics metrics, Time time) { + this.time = time; + + shareAcknowledgementSensor = metrics.sensor(SHARE_ACK_SENSOR); + shareAcknowledgementSensor.add(new Meter( + metrics.metricName( + SHARE_ACK_RATE, + METRICS_GROUP_NAME, + "The rate of number of acknowledge requests."), + metrics.metricName( + SHARE_ACK_COUNT, + METRICS_GROUP_NAME, + "The number of acknowledge requests."))); + + for (Map.Entry<Byte, String> entry : RECORD_ACKS_MAP.entrySet()) { + recordAcksSensorMap.put(entry.getKey(), metrics.sensor(String.format("%s-%s-sensor", RECORD_ACK_SENSOR_PREFIX, entry.getValue()))); + recordAcksSensorMap.get(entry.getKey()) + .add(new Meter( + metrics.metricName( + RECORD_ACK_RATE, + METRICS_GROUP_NAME, + "The rate of number of records acknowledged per acknowledgement type.", + ACK_TYPE, entry.getValue()), + metrics.metricName( + RECORD_ACK_COUNT, + METRICS_GROUP_NAME, + "The number of records acknowledged per acknowledgement type.", + ACK_TYPE, entry.getValue()))); + } + + partitionLoadTimeSensor = metrics.sensor(PARTITION_LOAD_TIME_SENSOR); + partitionLoadTimeSensor.add(metrics.metricName( + PARTITION_LOAD_TIME_AVG, + METRICS_GROUP_NAME, + "The average time in milliseconds to load the share partitions."), + new Avg()); + partitionLoadTimeSensor.add(metrics.metricName( + PARTITION_LOAD_TIME_MAX, + METRICS_GROUP_NAME, + "The maximum time in milliseconds to load the share partitions."), + new Max()); + } + + void shareAcknowledgement() { + shareAcknowledgementSensor.record(); + } + + void recordAcknowledgement(byte ackType) { + if (recordAcksSensorMap.containsKey(ackType)) { + recordAcksSensorMap.get(ackType).record(); + } else { Review Comment: @apoorvmittal10 I've seen this line being logged during integration tests. How do you think we should handle this? Either we need to ensure the metrics are only logged for ack types the metrics understands, or we need to permit the metrics to just ignore types which are not understood (which is my preferred option). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org