> On Sept. 4, 2015, 4:07 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/clients/NetworkClient.java, lines 
> > 362-367
> > <https://reviews.apache.org/r/36858/diff/6/?file=1063447#file1063447line362>
> >
> >     But those closed connections are only added to the disconnected list on 
> > the next selector.select() call right? So, you still have the issue that 
> > after a networkClient.poll() call, some socket connections are already 
> > cancelled but the connectionState is not reflecting that (need to wait for 
> > next poll() call). Also, it's a bit weird that the disconnect is initated 
> > in NetworkClient, but we have to push the info through selector and get it 
> > back.
> >     
> >     I was thinking that we can call handleTimedOutRequests() after 
> > handleConnections(). In that call, we first figure out the nodeIds that 
> > need to be closed. Then call selector.close() and for each such node, call 
> > the following code.
> >     
> >                 connectionStates.disconnected(node);
> >                 log.debug("Node {} disconnected.", node);
> >                 for (ClientRequest request : 
> > this.inFlightRequests.clearAll(node)) {
> >                     log.trace("Cancelled request {} due to node {} being 
> > disconnected", request, node);
> >                     if (!metadataUpdater.maybeHandleDisconnection(request))
> >                         responses.add(new ClientResponse(request, now, 
> > true, null));
> >                 }
> >     
> >     The above code can be put in a private method and be reused in 
> > handleDisconnected().
> >     
> >     Then, we can get rid of selector.disconect and replace existing usage 
> > with selector.close instead.

Hi Jun,

Thanks a lot for the comments.

For the concern :

"But those closed connections are only added to the disconnected list on the 
next selector.select() call right? So, you still have the issue that after a 
networkClient.poll() call, some socket connections are already cancelled but 
the connectionState is not reflecting that (need to wait for next poll() 
call)", I am adding the explicitly disconnected nodes to the list of 
disconnected nodes in the same NetworkClient.poll() itself. I add it to a list 
called clientDisconnects. When the  NetworkClient calls handleDisconnections(), 
it calls selector.disconnected(). Inside selector.disconnected(), I check if 
there are any entries in clientDisconnects and add thenm to the disconnected 
list and clear the clientDisconnects list. This means that every call to 
disconnected will get you all the disconnected nodes in that 
NetworkClient.poll(). So the explicitly disconnected nodes are added to the 
disconnected list in the same NetworkClient.poll().

But this appraoch kind of changes what selector.disconnect() javadoc says. I 
thought that if client is explicltiy calling disconnect() he would expect the 
disconnection to happen immidiately. The approach that you suggested above 
avoids making this change and I agree with that. 

I think Kafka-2411 kind of introduced some changes and I will need to rebase 
this patch with that. I am facing some issues when I rebased (test failures), 
so I will try to get those solved, include the approach that you have suggested 
above and upload a new patch.


- Mayuresh


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36858/#review97710
-----------------------------------------------------------


On Sept. 3, 2015, 10:12 p.m., Mayuresh Gharat wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36858/
> -----------------------------------------------------------
> 
> (Updated Sept. 3, 2015, 10:12 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2120
>     https://issues.apache.org/jira/browse/KAFKA-2120
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Solved compile error
> 
> 
> Addressed Jason's comments for Kip-19
> 
> 
> Addressed Jun's comments
> 
> 
> Addressed Jason's comments about the default values for requestTimeout
> 
> 
> checkpoint
> 
> 
> Addressed Joel's concerns. Also tried to include Jun's feedback.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/ClientRequest.java 
> dc8f0f115bcda893c95d17c0a57be8d14518d034 
>   clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
> 7d24c6f5dd2b63b96584f3aa8922a1d048dc1ae4 
>   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
> 15d00d4e484bb5d51a9ae6857ed6e024a2cc1820 
>   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
> 7ab2503794ff3aab39df881bd9fbae6547827d3b 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
> b31f7f1fbf93d29268b93811c9aad3e3c18e5312 
>   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
> b9a2d4e2bc565f0ee72b27791afe5c894af262f1 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> 938981c23ec16dfaf81d1e647929a59e1572f40f 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
>  9517d9d0cd480d5ba1d12f1fde7963e60528d2f8 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> 804d569498396d431880641041fc9292076452cb 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
> 06f00a99a73a288df9afa8c1d4abe3580fa968a6 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
>  4cb1e50d6c4ed55241aeaef1d3af09def5274103 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  a152bd7697dca55609a9ec4cfe0a82c10595fbc3 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
>  06182db1c3a5da85648199b4c0c98b80ea7c6c0c 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
> d2e64f7cd8bf56e433a210905b2874f71eee9ea0 
>   clients/src/main/java/org/apache/kafka/common/network/Selector.java 
> f49d54cbc1915ac686ff70ac657f08e4c96489c1 
>   clients/src/test/java/org/apache/kafka/clients/MockClient.java 
> 9133d85342b11ba2c9888d4d2804d181831e7a8e 
>   clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
> 43238ceaad0322e39802b615bb805b895336a009 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
>  2c693824fa53db1e38766b8c66a0ef42ef9d0f3a 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
>  5b2e4ffaeab7127648db608c179703b27b577414 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
>  aa44991777a855f4b7f4f7bf17107c69393ff8ff 
>   clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java 
> df1205c935bee9a30a50816dbade64d6014b1ef2 
>   clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 
> 3a684d98b05cadfb25c6f7f9a038ef1f6697edbf 
>   core/src/main/scala/kafka/tools/ProducerPerformance.scala 
> 0335cc64013ffe2cdf1c4879e86e11ec8c526712 
>   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
> 1198df02ddd7727269e84a751ba99520f6d5584a 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
> 09b8444c2add87f0f70dbb182e892977a6b5c243 
> 
> Diff: https://reviews.apache.org/r/36858/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Mayuresh Gharat
> 
>

Reply via email to