lianetm commented on code in PR #19980:
URL: https://github.com/apache/kafka/pull/19980#discussion_r2298036689


##########
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java:
##########
@@ -1588,6 +1593,103 @@ private void sendCompressedMessages(int numRecords, 
TopicPartition tp) {
         }
     }
 
+    // Override the default test timeout of 60 seconds since the core logic of 
the test allows up to 60 seconds to
+    // pass to detect the issue.
+    @Timeout(75)
+    @ClusterTest
+    public void testClassicConsumerStallBetweenPoll() throws Exception {
+        testStallBetweenPoll(GroupProtocol.CLASSIC);
+    }
+
+    // Override the default test timeout of 60 seconds since the core logic of 
the test allows up to 60 seconds to
+    // pass to detect the issue.
+    @Timeout(75)
+    @ClusterTest
+    public void testAsyncConsumerStallBetweenPoll() throws Exception {
+        testStallBetweenPoll(GroupProtocol.CONSUMER);
+    }
+
+    /**
+     * This test is to prove that the intermittent stalling that has been 
experienced when using the asynchronous
+     * consumer, as filed under KAFKA-19259, have been fixed.
+     *
+     * <p/>
+     *
+     * The basic idea is to have one thread that produces a record every 500 
ms. and the main thread that consumes
+     * records without pausing between polls for much more than the produce 
delay. In the test case filed in
+     * KAFKA-19259, the consumer sometimes pauses for up to 5-10 seconds 
despite records being produced every
+     * quarter second.
+     */
+    private void testStallBetweenPoll(GroupProtocol groupProtocol) throws 
Exception {
+        var testTopic = "stutter-test-topic";
+        var numPartitions = 6;
+        cluster.createTopic(testTopic, numPartitions, (short) BROKER_COUNT);
+
+        // Give the test one minute to detect a stall.
+        var testTimeout = 60000;
+
+        // The producer must produce slowly to tickle the scenario.
+        var produceWait = 500;
+
+        // Assign a tolerance for how much time is allowed to pass between 
Consumer.poll() calls given that there
+        // should be *at least* one record to read every second.
+        var delayTolerance = produceWait * 2;
+
+        try (var producer = cluster.producer()) {
+            // Start a thread running that produces records at a relative 
trickle.
+            var producerThread = new Thread(() -> {
+                while (true) {
+                    try {
+                        Utils.sleep(produceWait);
+                        producer.send(new ProducerRecord<>(testTopic, 
TestUtils.randomBytes(64))).get();
+                    } catch (InterruptedException e) {
+                        break;
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            });
+            producerThread.start();
+
+            Map<String, Object> consumerConfig = Map.of(GROUP_PROTOCOL_CONFIG, 
groupProtocol.name().toLowerCase(Locale.ROOT));
+
+            try (Consumer<byte[], byte[]> consumer = 
cluster.consumer(consumerConfig)) {
+                consumer.subscribe(List.of(testTopic));
+
+                // This is just to wait until the group membership and 
assignment is in place.
+                awaitNonEmptyRecords(consumer, new TopicPartition(testTopic, 
0));
+
+                var testTimer = Time.SYSTEM.timer(testTimeout);
+
+                // Keep track of the last time the poll is invoked to ensure 
the deltas between invocations don't
+                // exceed the delay threshold defined above.
+                var lastPoll = System.currentTimeMillis();
+
+                while (testTimer.notExpired()) {

Review Comment:
   this means that this test will run for the full 1min 15s (testTimeout=75s), 
but can't we simplify and detect the issue much sooner? 
   
   The faulty behaviour is that one poll runs fast, and the next one takes the 
full pollTimeout, right? So not if I'm missing something but wouldn't 2 polls 
be enough? (also considering that `awaitNonEmptyRecords` does poll before the 
actual call to consumer.poll on ln 1670). 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##########
@@ -222,6 +222,10 @@ protected void handleFetchSuccess(final Node fetchTarget,
                 fetchBuffer.add(completedFetch);
             }
 
+            // "Wake" the fetch buffer on any response, even if it's empty, to 
allow the consumer to not block
+            // indefinitely waiting on the fetch buffer to get data.
+            fetchBuffer.wakeup();

Review Comment:
   Should we notify the buffer as soon as we add the first completed batch to 
it, instead of waiting for all completed batched to be added? 
   
   Seems that will be closer to what the `ClassicConsumer` does by polling 
while `!fetcher.hasAvailableFetches()` (given that hasAvailable fetches will be 
true as soon as there is a first CompletedBatch added)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to