----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36858/#review97213 -----------------------------------------------------------
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java (lines 361 - 366) <https://reviews.apache.org/r/36858/#comment153008> We need to think through this logic a bit more. The patch here disconnects the connection from the selector, but doesn't mark the connectionState as disconnected immediately. Only until the next networkClient.poll(), does the connectionState change to disconnected. The issue with this approach is that selector.disconnect actually cancels the socket key. So, at that moment, the connection is no longer usable. However, the connectionState is still connected. A client can check the connection as ready and then makes a send call. The send will then get a CancelledKeyException, which is weird. We are dealing with a similar issue in KAFKA-2411. Our current thinking is to have a networkClient.disconnect() that closes the socket key as well as removes the client from connectionState. This will make the state in networkClient consistent after each poll() call. If we have that, we can just call networkClient.disconnect() in handleTimedOutRequests() and handle those disconnected connections immediately. Then, we don't need to maintain the clientDisconnects state in Selector. - Jun Rao On Aug. 12, 2015, 5:59 p.m., Mayuresh Gharat wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/36858/ > ----------------------------------------------------------- > > (Updated Aug. 12, 2015, 5:59 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-2120 > https://issues.apache.org/jira/browse/KAFKA-2120 > > > Repository: kafka > > > Description > ------- > > Solved compile error > > > Addressed Jason's comments for Kip-19 > > > Addressed Jun's comments > > > Addressed Jason's comments about the default values for requestTimeout > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/clients/ClientRequest.java > dc8f0f115bcda893c95d17c0a57be8d14518d034 > clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java > 2c421f42ed3fc5d61cf9c87a7eaa7bb23e26f63b > clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java > 15d00d4e484bb5d51a9ae6857ed6e024a2cc1820 > clients/src/main/java/org/apache/kafka/clients/KafkaClient.java > 7ab2503794ff3aab39df881bd9fbae6547827d3b > clients/src/main/java/org/apache/kafka/clients/NetworkClient.java > 0e51d7bd461d253f4396a5b6ca7cd391658807fa > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java > d35b421a515074d964c7fccb73d260b847ea5f00 > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java > ed99e9bdf7c4ea7a6d4555d4488cf8ed0b80641b > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java > 9517d9d0cd480d5ba1d12f1fde7963e60528d2f8 > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java > 03b8dd23df63a8d8a117f02eabcce4a2d48c44f7 > clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java > aa264202f2724907924985a5ecbe74afc4c6c04b > > clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java > 4cb1e50d6c4ed55241aeaef1d3af09def5274103 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java > a152bd7697dca55609a9ec4cfe0a82c10595fbc3 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java > 06182db1c3a5da85648199b4c0c98b80ea7c6c0c > > clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java > 0baf16e55046a2f49f6431e01d52c323c95eddf0 > clients/src/main/java/org/apache/kafka/common/network/Selector.java > ce20111ac434eb8c74585e9c63757bb9d60a832f > clients/src/test/java/org/apache/kafka/clients/MockClient.java > 9133d85342b11ba2c9888d4d2804d181831e7a8e > clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java > 43238ceaad0322e39802b615bb805b895336a009 > > clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java > 2c693824fa53db1e38766b8c66a0ef42ef9d0f3a > > clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java > 5b2e4ffaeab7127648db608c179703b27b577414 > > clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java > 8b1805d3d2bcb9fe2bacb37d870c3236aa9532c4 > clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java > 158f9829ff64a969008f699e40c51e918287859e > core/src/main/scala/kafka/tools/ProducerPerformance.scala > 0335cc64013ffe2cdf1c4879e86e11ec8c526712 > core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala > ee94011894b46864614b97bbd2a98375a7d3f20b > core/src/test/scala/unit/kafka/utils/TestUtils.scala > eb169d8b33c27d598cc24e5a2e5f78b789fa38d3 > > Diff: https://reviews.apache.org/r/36858/diff/ > > > Testing > ------- > > > Thanks, > > Mayuresh Gharat > >