divijvaidya commented on code in PR #12314:
URL: https://github.com/apache/kafka/pull/12314#discussion_r901646698


##########
server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java:
##########
@@ -166,6 +171,13 @@ private class EventHandler implements Runnable {
          * A condition variable for waking up the event handler thread.
          */
         private final Condition cond = lock.newCondition();
+        private final Consumer<Integer> onQueueSizeChange;
+
+        private AtomicInteger size = new AtomicInteger(0);

Review Comment:
   Long? Is there a reason we are limiting this to an integer?



##########
server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java:
##########
@@ -178,15 +190,18 @@ public void run() {
         }
 
         private void remove(EventContext eventContext) {
-            eventContext.remove();
+            boolean removed = eventContext.remove();
             if (eventContext.deadlineNs.isPresent()) {
-                deadlineMap.remove(eventContext.deadlineNs.getAsLong());
+                removed |= 
deadlineMap.remove(eventContext.deadlineNs.getAsLong()) != null;
                 eventContext.deadlineNs = OptionalLong.empty();
             }
             if (eventContext.tag != null) {
                 tagToEventContext.remove(eventContext.tag, eventContext);
                 eventContext.tag = null;
             }
+            if (removed) {
+                onQueueSizeChange.accept(size.decrementAndGet());

Review Comment:
   I am a bit sceptical about running an arbitrary consumer in a synchronized 
block of code (this runs by acquiring a lock). Because the caller can 
potentially perform expensive things in callback which could cause block the 
other threads from accessing the lock, hence, block critical execution while 
performing a side-effect. Instead, could we make it an async explicit metric 
publish event?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to