[ 
https://issues.apache.org/jira/browse/KAFKA-9998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17111317#comment-17111317
 ] 

radai rosenblatt commented on KAFKA-9998:
-----------------------------------------

i think thats wrong, for several reasons:
 # the io thread will (eventually) terminate because 
this.sender.initiateClose() has been called - so no resource leak. just a delay 
in freeing resources.
 # just like schedulers/executor service classes in the JDK, there's no need to 
make close() block until all resources have been released
 # this violates the caller thread's timeout argument, where elsewhere the 
close method seems to honor it - thats just inconsistent API behavior - either 
the producer respects timeout or it does not. "sometimes" is hard to explain to 
users.

seems to me this part was jjust forgotten when timeout support was added

> KafkaProducer.close(timeout) still may block indefinitely
> ---------------------------------------------------------
>
>                 Key: KAFKA-9998
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9998
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 2.4.1
>            Reporter: radai rosenblatt
>            Priority: Major
>
> looking at KafkaProducer.close(timeout), we have this:
> {code:java}
> private void close(Duration timeout, boolean swallowException) {
>     long timeoutMs = timeout.toMillis();
>     if (timeoutMs < 0)
>         throw new IllegalArgumentException("The timeout cannot be negative.");
>     log.info("Closing the Kafka producer with timeoutMillis = {} ms.", 
> timeoutMs);
>     // this will keep track of the first encountered exception
>     AtomicReference<Throwable> firstException = new AtomicReference<>();
>     boolean invokedFromCallback = Thread.currentThread() == this.ioThread;
>     if (timeoutMs > 0) {
>         if (invokedFromCallback) {
>             log.warn("Overriding close timeout {} ms to 0 ms in order to 
> prevent useless blocking due to self-join. " +
>                     "This means you have incorrectly invoked close with a 
> non-zero timeout from the producer call-back.",
>                     timeoutMs);
>         } else {
>             // Try to close gracefully.
>             if (this.sender != null)
>                 this.sender.initiateClose();
>             if (this.ioThread != null) {
>                 try {
>                     this.ioThread.join(timeoutMs);    <---- GRACEFUL JOIN
>                 } catch (InterruptedException t) {
>                     firstException.compareAndSet(null, new 
> InterruptException(t));
>                     log.error("Interrupted while joining ioThread", t);
>                 }
>             }
>         }
>     }
>     if (this.sender != null && this.ioThread != null && 
> this.ioThread.isAlive()) {
>         log.info("Proceeding to force close the producer since pending 
> requests could not be completed " +
>                 "within timeout {} ms.", timeoutMs);
>         this.sender.forceClose();
>         // Only join the sender thread when not calling from callback.
>         if (!invokedFromCallback) {
>             try {
>                 this.ioThread.join();   <----- UNBOUNDED JOIN
>             } catch (InterruptedException e) {
>                 firstException.compareAndSet(null, new InterruptException(e));
>             }
>         }
>     }
> ...
> }
> {code}
> specifically in our case the ioThread was running a (very) long running 
> user-provided callback which was preventing the producer from closing within 
> the given timeout.
>  
> I think the 2nd join() call should either be _VERY_ short (since we're 
> already past the timeout at that stage) ir should not happen at all.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to