AndrewJSchofield commented on code in PR #17946:
URL: https://github.com/apache/kafka/pull/17946#discussion_r1858977786


##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1223,6 +1223,11 @@ private void ensureValidRecordSize(int size) {
      */
     @Override
     public void flush() {
+        if (Thread.currentThread() == this.ioThread) {
+            log.error("flush invocation detected in the callback. This can 
cause a deadlock due to thread blocking.");

Review Comment:
   I would change the wording here because actually the code is preventing the 
call and that means there is no risk of a deadlock. Something like 
"KafkaProducer.flush() invocation detected inside callback. This is not 
permitted because of the risk of deadlock."
   
   The message in the exception could be the same. The choice of KafkaException 
seems most appropriate to me.



##########
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java:
##########
@@ -2154,6 +2154,32 @@ public void testCallbackAndInterceptorHandleError() {
         }
     }
 
+    @Test
+    public void shouldNotInvokeFlushInCallback() {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);

Review Comment:
   Is this setting relevant? Having it in this test makes me think that the 
test would only work with it, but I do not believe that to be the case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to