divijvaidya commented on code in PR #12314:
URL: https://github.com/apache/kafka/pull/12314#discussion_r901646698
##########
server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java:
##########
@@ -166,6 +171,13 @@ private class EventHandler implements Runnable {
* A condition variable for waking up the event handler thread.
*/
private final Condition cond = lock.newCondition();
+ private final Consumer<Integer> onQueueSizeChange;
+
+ private AtomicInteger size = new AtomicInteger(0);
Review Comment:
Long? Is there a reason we are limiting this to an integer?
##########
server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java:
##########
@@ -178,15 +190,18 @@ public void run() {
}
private void remove(EventContext eventContext) {
- eventContext.remove();
+ boolean removed = eventContext.remove();
if (eventContext.deadlineNs.isPresent()) {
- deadlineMap.remove(eventContext.deadlineNs.getAsLong());
+ removed |=
deadlineMap.remove(eventContext.deadlineNs.getAsLong()) != null;
eventContext.deadlineNs = OptionalLong.empty();
}
if (eventContext.tag != null) {
tagToEventContext.remove(eventContext.tag, eventContext);
eventContext.tag = null;
}
+ if (removed) {
+ onQueueSizeChange.accept(size.decrementAndGet());
Review Comment:
I am a bit sceptical about running an arbitrary consumer in a synchronized
block of code (this runs by acquiring a lock). Because the caller can
potentially perform expensive things in callback which could cause block the
other threads from accessing the lock, hence, block critical execution while
performing a side-effect. Instead, could we make it an async explicit metric
publish event?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]