> On Sept. 4, 2015, 4:07 p.m., Jun Rao wrote: > > clients/src/main/java/org/apache/kafka/clients/NetworkClient.java, lines > > 362-367 > > <https://reviews.apache.org/r/36858/diff/6/?file=1063447#file1063447line362> > > > > But those closed connections are only added to the disconnected list on > > the next selector.select() call right? So, you still have the issue that > > after a networkClient.poll() call, some socket connections are already > > cancelled but the connectionState is not reflecting that (need to wait for > > next poll() call). Also, it's a bit weird that the disconnect is initated > > in NetworkClient, but we have to push the info through selector and get it > > back. > > > > I was thinking that we can call handleTimedOutRequests() after > > handleConnections(). In that call, we first figure out the nodeIds that > > need to be closed. Then call selector.close() and for each such node, call > > the following code. > > > > connectionStates.disconnected(node); > > log.debug("Node {} disconnected.", node); > > for (ClientRequest request : > > this.inFlightRequests.clearAll(node)) { > > log.trace("Cancelled request {} due to node {} being > > disconnected", request, node); > > if (!metadataUpdater.maybeHandleDisconnection(request)) > > responses.add(new ClientResponse(request, now, > > true, null)); > > } > > > > The above code can be put in a private method and be reused in > > handleDisconnected(). > > > > Then, we can get rid of selector.disconect and replace existing usage > > with selector.close instead.
Hi Jun, Thanks a lot for the comments. For the concern : "But those closed connections are only added to the disconnected list on the next selector.select() call right? So, you still have the issue that after a networkClient.poll() call, some socket connections are already cancelled but the connectionState is not reflecting that (need to wait for next poll() call)", I am adding the explicitly disconnected nodes to the list of disconnected nodes in the same NetworkClient.poll() itself. I add it to a list called clientDisconnects. When the NetworkClient calls handleDisconnections(), it calls selector.disconnected(). Inside selector.disconnected(), I check if there are any entries in clientDisconnects and add thenm to the disconnected list and clear the clientDisconnects list. This means that every call to disconnected will get you all the disconnected nodes in that NetworkClient.poll(). So the explicitly disconnected nodes are added to the disconnected list in the same NetworkClient.poll(). But this appraoch kind of changes what selector.disconnect() javadoc says. I thought that if client is explicltiy calling disconnect() he would expect the disconnection to happen immidiately. The approach that you suggested above avoids making this change and I agree with that. I think Kafka-2411 kind of introduced some changes and I will need to rebase this patch with that. I am facing some issues when I rebased (test failures), so I will try to get those solved, include the approach that you have suggested above and upload a new patch. - Mayuresh ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36858/#review97710 ----------------------------------------------------------- On Sept. 3, 2015, 10:12 p.m., Mayuresh Gharat wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/36858/ > ----------------------------------------------------------- > > (Updated Sept. 3, 2015, 10:12 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-2120 > https://issues.apache.org/jira/browse/KAFKA-2120 > > > Repository: kafka > > > Description > ------- > > Solved compile error > > > Addressed Jason's comments for Kip-19 > > > Addressed Jun's comments > > > Addressed Jason's comments about the default values for requestTimeout > > > checkpoint > > > Addressed Joel's concerns. Also tried to include Jun's feedback. > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/clients/ClientRequest.java > dc8f0f115bcda893c95d17c0a57be8d14518d034 > clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java > 7d24c6f5dd2b63b96584f3aa8922a1d048dc1ae4 > clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java > 15d00d4e484bb5d51a9ae6857ed6e024a2cc1820 > clients/src/main/java/org/apache/kafka/clients/KafkaClient.java > 7ab2503794ff3aab39df881bd9fbae6547827d3b > clients/src/main/java/org/apache/kafka/clients/NetworkClient.java > b31f7f1fbf93d29268b93811c9aad3e3c18e5312 > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java > b9a2d4e2bc565f0ee72b27791afe5c894af262f1 > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java > 938981c23ec16dfaf81d1e647929a59e1572f40f > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java > 9517d9d0cd480d5ba1d12f1fde7963e60528d2f8 > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java > 804d569498396d431880641041fc9292076452cb > clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java > 06f00a99a73a288df9afa8c1d4abe3580fa968a6 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java > 4cb1e50d6c4ed55241aeaef1d3af09def5274103 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java > a152bd7697dca55609a9ec4cfe0a82c10595fbc3 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java > 06182db1c3a5da85648199b4c0c98b80ea7c6c0c > > clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java > d2e64f7cd8bf56e433a210905b2874f71eee9ea0 > clients/src/main/java/org/apache/kafka/common/network/Selector.java > f49d54cbc1915ac686ff70ac657f08e4c96489c1 > clients/src/test/java/org/apache/kafka/clients/MockClient.java > 9133d85342b11ba2c9888d4d2804d181831e7a8e > clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java > 43238ceaad0322e39802b615bb805b895336a009 > > clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java > 2c693824fa53db1e38766b8c66a0ef42ef9d0f3a > > clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java > 5b2e4ffaeab7127648db608c179703b27b577414 > > clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java > aa44991777a855f4b7f4f7bf17107c69393ff8ff > clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java > df1205c935bee9a30a50816dbade64d6014b1ef2 > clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java > 3a684d98b05cadfb25c6f7f9a038ef1f6697edbf > core/src/main/scala/kafka/tools/ProducerPerformance.scala > 0335cc64013ffe2cdf1c4879e86e11ec8c526712 > core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala > 1198df02ddd7727269e84a751ba99520f6d5584a > core/src/test/scala/unit/kafka/utils/TestUtils.scala > 09b8444c2add87f0f70dbb182e892977a6b5c243 > > Diff: https://reviews.apache.org/r/36858/diff/ > > > Testing > ------- > > > Thanks, > > Mayuresh Gharat > >