> On May 6, 2015, 10:50 p.m., Jun Rao wrote: > > clients/src/main/java/org/apache/kafka/common/network/Selector.java, lines > > 311-315 > > <https://reviews.apache.org/r/33065/diff/4/?file=947532#file947532line311> > > > > In this case, we should probably just propagate the exception and > > potentially kill the caller, instead of continue, right? > > Gwen Shapira wrote: > Are you saying this due the the warning or the exception itself? > > The warning is misplaced - we don't validate requests at this later. > Illegal state exception will be due to our own bugs mostly - such as > attempting to send data while another send is in progress. I think continuing > here is fine? > > I'm fixing the error message and adding code to handle invalid requests > in SocketServer. The SocketServer exception handler will basically print an > error, close the connection and continue. > > Jun Rao wrote: > If we get into an IllegalStateException, the state may not be fixable. > So, it's probably better to let the caller deal with it. On the server side, > we will probably just log an error and stop the processor thread. > > Gwen Shapira wrote: > Thats a good point - we don't know what caused us to get to an illegal > state, and therefore don't know if they rest of the connections for the > processor are OK or not. > > However, in the current server code, we never close processor due to > errors. We close connections, but processors stick around. This makes sense > since we never re-open processors, so closing the processor permenantly > reduces the server's capacity. > > I can move handling to the SocketServer (actually, the docs say that we > throw this exception - so I have to move it), but I still think the right > handling is to log an error and keep trying to process other connections.
Never mind. Even though I dislike losing a processor, we have no choice: There's no connection to close, since the illegal state may be due to connection not existing in first place. - Gwen ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33065/#review82625 ----------------------------------------------------------- On May 14, 2015, 9:51 p.m., Gwen Shapira wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/33065/ > ----------------------------------------------------------- > > (Updated May 14, 2015, 9:51 p.m.) > > > Review request for kafka. > > > Bugs: 1928 and KAFKA-1928 > https://issues.apache.org/jira/browse/1928 > https://issues.apache.org/jira/browse/KAFKA-1928 > > > Repository: kafka > > > Description > ------- > > first pass on replacing Send > > > implement maxSize and improved docs > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > KAFKA-1928-v2 > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > KAFKA-1928-v2 > > Conflicts: > core/src/main/scala/kafka/network/RequestChannel.scala > > moved selector out of abstract thread > > > mid-way through putting selector in SocketServer > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > KAFKA-1928-v2 > > Also, SocketServer is now using Selector. Stil a bit messy - but all tests > pass. > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > KAFKA-1928-v2 > > > renamed requestKey to connectionId to reflect new use and changed type from > Any to String > > > Following Jun's comments - moved MultiSend to client. Cleaned up destinations > as well > > > removed reify and remaining from send/recieve API, per Jun. moved > maybeCloseOldest() to Selector per Jay > > > added idString to node API, changed written to int in Send API > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java > da76cc2 > clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java > 936487b > clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 1311f85 > clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 435fbb5 > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java > bdff518 > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java > d301be4 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java > e55ab11 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java > ef9dd52 > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java > 8e336a3 > clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java > 187d000 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java > 1e943d6 > clients/src/main/java/org/apache/kafka/common/Node.java f4e4186 > > clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java > 129ae82 > clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java > c8213e1 > clients/src/main/java/org/apache/kafka/common/network/MultiSend.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java > fc0d168 > clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java > 68327cd > clients/src/main/java/org/apache/kafka/common/network/Receive.java 4e33078 > clients/src/main/java/org/apache/kafka/common/network/Selectable.java > b5f8d83 > clients/src/main/java/org/apache/kafka/common/network/Selector.java 57de058 > clients/src/main/java/org/apache/kafka/common/network/Send.java 5d321a0 > clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java > 27cbf39 > clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java > PRE-CREATION > clients/src/test/java/org/apache/kafka/clients/MockClient.java 5e3fab1 > clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java > 8b27889 > clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java > d5b306b > clients/src/test/java/org/apache/kafka/test/MockSelector.java ea89b06 > core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 1c3b380 > core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala a3b1b78 > core/src/main/scala/kafka/api/ControlledShutdownRequest.scala fe81635 > core/src/main/scala/kafka/api/FetchRequest.scala b038c15 > core/src/main/scala/kafka/api/FetchResponse.scala 75aaf57 > core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala 431190a > core/src/main/scala/kafka/api/OffsetCommitRequest.scala 317daed > core/src/main/scala/kafka/api/OffsetFetchRequest.scala fa8bd6a > core/src/main/scala/kafka/api/OffsetRequest.scala 3d483bc > core/src/main/scala/kafka/api/ProducerRequest.scala 570b2da > core/src/main/scala/kafka/api/StopReplicaRequest.scala 5e14987 > core/src/main/scala/kafka/api/TopicMetadataRequest.scala 363bae0 > core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 69f0397 > core/src/main/scala/kafka/client/ClientUtils.scala 62394c0 > core/src/main/scala/kafka/consumer/SimpleConsumer.scala 31a2639 > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala aa8d940 > core/src/main/scala/kafka/controller/ControllerChannelManager.scala 6cf13f0 > core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala b0b7be1 > core/src/main/scala/kafka/network/BlockingChannel.scala 6e2a38e > core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala c0d7726 > core/src/main/scala/kafka/network/BoundedByteBufferSend.scala b95b73b > core/src/main/scala/kafka/network/ByteBufferSend.scala af30042 > core/src/main/scala/kafka/network/Handler.scala a030033 > core/src/main/scala/kafka/network/RequestChannel.scala 1d0024c > core/src/main/scala/kafka/network/RequestOrResponseSend.scala PRE-CREATION > core/src/main/scala/kafka/network/SocketServer.scala edf6214 > core/src/main/scala/kafka/network/Transmission.scala 2827103 > core/src/main/scala/kafka/producer/SyncProducer.scala 0f09951 > core/src/main/scala/kafka/server/KafkaApis.scala 417960d > core/src/main/scala/kafka/server/KafkaServer.scala b7d2a28 > core/src/main/scala/kafka/server/MessageSetSend.scala 5667648 > core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala d2bac85 > core/src/test/resources/log4j.properties 1b7d5d8 > core/src/test/scala/other/kafka/TestOffsetManager.scala 9881bd3 > core/src/test/scala/unit/kafka/network/SocketServerTest.scala 95d5621 > > Diff: https://reviews.apache.org/r/33065/diff/ > > > Testing > ------- > > > Thanks, > > Gwen Shapira > >