[ https://issues.apache.org/jira/browse/KAFKA-9998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17111317#comment-17111317 ]
radai rosenblatt commented on KAFKA-9998: ----------------------------------------- i think thats wrong, for several reasons: # the io thread will (eventually) terminate because this.sender.initiateClose() has been called - so no resource leak. just a delay in freeing resources. # just like schedulers/executor service classes in the JDK, there's no need to make close() block until all resources have been released # this violates the caller thread's timeout argument, where elsewhere the close method seems to honor it - thats just inconsistent API behavior - either the producer respects timeout or it does not. "sometimes" is hard to explain to users. seems to me this part was jjust forgotten when timeout support was added > KafkaProducer.close(timeout) still may block indefinitely > --------------------------------------------------------- > > Key: KAFKA-9998 > URL: https://issues.apache.org/jira/browse/KAFKA-9998 > Project: Kafka > Issue Type: Bug > Affects Versions: 2.4.1 > Reporter: radai rosenblatt > Priority: Major > > looking at KafkaProducer.close(timeout), we have this: > {code:java} > private void close(Duration timeout, boolean swallowException) { > long timeoutMs = timeout.toMillis(); > if (timeoutMs < 0) > throw new IllegalArgumentException("The timeout cannot be negative."); > log.info("Closing the Kafka producer with timeoutMillis = {} ms.", > timeoutMs); > // this will keep track of the first encountered exception > AtomicReference<Throwable> firstException = new AtomicReference<>(); > boolean invokedFromCallback = Thread.currentThread() == this.ioThread; > if (timeoutMs > 0) { > if (invokedFromCallback) { > log.warn("Overriding close timeout {} ms to 0 ms in order to > prevent useless blocking due to self-join. " + > "This means you have incorrectly invoked close with a > non-zero timeout from the producer call-back.", > timeoutMs); > } else { > // Try to close gracefully. > if (this.sender != null) > this.sender.initiateClose(); > if (this.ioThread != null) { > try { > this.ioThread.join(timeoutMs); <---- GRACEFUL JOIN > } catch (InterruptedException t) { > firstException.compareAndSet(null, new > InterruptException(t)); > log.error("Interrupted while joining ioThread", t); > } > } > } > } > if (this.sender != null && this.ioThread != null && > this.ioThread.isAlive()) { > log.info("Proceeding to force close the producer since pending > requests could not be completed " + > "within timeout {} ms.", timeoutMs); > this.sender.forceClose(); > // Only join the sender thread when not calling from callback. > if (!invokedFromCallback) { > try { > this.ioThread.join(); <----- UNBOUNDED JOIN > } catch (InterruptedException e) { > firstException.compareAndSet(null, new InterruptException(e)); > } > } > } > ... > } > {code} > specifically in our case the ioThread was running a (very) long running > user-provided callback which was preventing the producer from closing within > the given timeout. > > I think the 2nd join() call should either be _VERY_ short (since we're > already past the timeout at that stage) ir should not happen at all. -- This message was sent by Atlassian Jira (v8.3.4#803005)