[
https://issues.apache.org/jira/browse/KAFKA-16768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18062171#comment-18062171
]
Christo Lolov edited comment on KAFKA-16768 at 3/3/26 3:58 PM:
---------------------------------------------------------------
Hello [~cyh2012aa] and [~jainamshah102]! I am happy to review efforts on this
ticket. For my understanding, did you consider a second draining in
Processor#close? Something like:
{color:#172b4d}def close(): Unit = {{color}
{color:#172b4d} try {{color}
{color:#172b4d} beginShutdown(){color}
{color:#172b4d} thread.join(){color}
{color:#172b4d} if (!started.get) {{color}
{color:#172b4d} Utils.swallow(this.logger.underlying, Level.ERROR, () =>
closeAll()){color}
{color:#172b4d} }{color}
{color:#172b4d} closeStraggleConnections() // <-- Second pass at draining
connections{color}
{color:#172b4d} finally {{color}
{color:#172b4d} metricsGroup.removeMetric("IdlePercent",
Map("networkProcessor" -> id.toString).asJava){color}
{color:#172b4d}
metrics.removeMetric(expiredConnectionsKilledCountMetricName){color}
{color:#172b4d} }{color}
}
was (Author: christo_lolov):
Hello [~cyh2012aa] and [~jainamshah102]! I am happy to review efforts on this
ticket. For my understanding, did you consider a second draining in
Processor#close? Something like:
{color:#172b4d}def close(): Unit = {{color}
{color:#172b4d} try {{color}
{color:#172b4d} beginShutdown(){color}
{color:#172b4d} thread.join(){color}
{color:#172b4d} if (!started.get) {{color}
{color:#172b4d} Utils.swallow(this.logger.underlying, Level.ERROR, () =>
closeAll()){color}
{color:#172b4d} }{color}
{color:#172b4d} closeStraggleConnections() // <-- Second pass at draining
connections{color}
{color:#172b4d} finally {{color}
{color:#172b4d} metricsGroup.removeMetric("IdlePercent",
Map("networkProcessor" -> id.toString).asJava){color}
{color:#172b4d}
metrics.removeMetric(expiredConnectionsKilledCountMetricName){color}
{color:#172b4d} }{color}
{color:#172b4d}}{color}
> SocketServer leaks accepted SocketChannel instances due to race condition
> -------------------------------------------------------------------------
>
> Key: KAFKA-16768
> URL: https://issues.apache.org/jira/browse/KAFKA-16768
> Project: Kafka
> Issue Type: Bug
> Components: core
> Affects Versions: 3.8.0
> Reporter: Greg Harris
> Assignee: Chang-Yu Huang
> Priority: Major
> Labels: newbie
>
> The SocketServer has threads for Acceptors and Processors. These threads
> communicate via Processor#accept/Processor#configureNewConnections and the
> `newConnections` queue.
> During shutdown, the Acceptor and Processors are each stopped by setting
> shouldRun to false, and then shutdown proceeds asynchronously in all
> instances together. This leads to a race condition where an Acceptor accepts
> a SocketChannel and queues it to a Processor, but that Processor instance has
> already started shutting down and has already drained the newConnections
> queue.
> KAFKA-16765 is an analogous bug in NioEchoServer, which uses a completely
> different implementation but has the same flaw.
> An example execution order that includes this leak:
> 1. Acceptor#accept() is called, and a new SocketChannel is accepted.
> 2. Acceptor#assignNewConnection() begins
> 3. Acceptor#close() is called, which sets shouldRun to false in the Acceptor
> and attached Processor instances
> 4. Processor#run() checks the shouldRun variable, and exits the loop
> 5. Processor#closeAll() executes, and drains the `newConnections` variable
> 6. Processor#run() returns and the Processor thread terminates
> 7. Acceptor#assignNewConnection() calls Processor#accept(), which adds the
> SocketChannel to `newConnections`
> 8. Acceptor#assignNewConnection() returns
> 9. Acceptor#run() checks the shouldRun variable and exits the loop, and the
> Acceptor thread terminates.
> 10. Acceptor#close() joins all of the terminated threads, and returns
> At the end of this sequence, there are still open SocketChannel instances in
> newConnections, which are then considered leaked.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)