> On May 6, 2015, 10:50 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/common/network/Selector.java, lines 
> > 311-315
> > <https://reviews.apache.org/r/33065/diff/4/?file=947532#file947532line311>
> >
> >     In this case, we should probably just propagate the exception and 
> > potentially kill the caller, instead of continue, right?
> 
> Gwen Shapira wrote:
>     Are you saying this due the the warning or the exception itself?
>     
>     The warning is misplaced - we don't validate requests at this later. 
> Illegal state exception will be due to our own bugs mostly - such as 
> attempting to send data while another send is in progress. I think continuing 
> here is fine?
>     
>     I'm fixing the error message and adding code to handle invalid requests 
> in SocketServer. The SocketServer exception handler will basically print an 
> error, close the connection and continue.
> 
> Jun Rao wrote:
>     If we get into an IllegalStateException, the state may not be fixable. 
> So, it's probably better to let the caller deal with it. On the server side, 
> we will probably just log an error and stop the processor thread.
> 
> Gwen Shapira wrote:
>     Thats a good point - we don't know what caused us to get to an illegal 
> state, and therefore don't know if they rest of the connections for the 
> processor are OK or not.
>     
>     However, in the current server code, we never close processor due to 
> errors. We close connections, but processors stick around. This makes sense 
> since we never re-open processors, so closing the processor permenantly 
> reduces the server's capacity. 
>     
>     I can move handling to the SocketServer (actually, the docs say that we 
> throw this exception - so I have to move it), but I still think the right 
> handling is to log an error and keep trying to process other connections.

Never mind. Even though I dislike losing a processor, we have no choice:
There's no connection to close, since the illegal state may be due to 
connection not existing in first place.


- Gwen


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review82625
-----------------------------------------------------------


On May 14, 2015, 9:51 p.m., Gwen Shapira wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/33065/
> -----------------------------------------------------------
> 
> (Updated May 14, 2015, 9:51 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: 1928 and KAFKA-1928
>     https://issues.apache.org/jira/browse/1928
>     https://issues.apache.org/jira/browse/KAFKA-1928
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> first pass on replacing Send
> 
> 
> implement maxSize and improved docs
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> KAFKA-1928-v2
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> KAFKA-1928-v2
> 
> Conflicts:
>       core/src/main/scala/kafka/network/RequestChannel.scala
> 
> moved selector out of abstract thread
> 
> 
> mid-way through putting selector in SocketServer
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> KAFKA-1928-v2
> 
> Also, SocketServer is now using Selector. Stil a bit messy - but all tests 
> pass.
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> KAFKA-1928-v2
> 
> 
> renamed requestKey to connectionId to reflect new use and changed type from 
> Any to String
> 
> 
> Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
> as well
> 
> 
> removed reify and remaining from send/recieve API, per Jun. moved 
> maybeCloseOldest() to Selector per Jay
> 
> 
> added idString to node API, changed written to int in Send API
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
> da76cc2 
>   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
> 936487b 
>   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 1311f85 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 435fbb5 
>   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
> bdff518 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> d301be4 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
>  e55ab11 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
>  ef9dd52 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> 8e336a3 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
> 187d000 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
> 1e943d6 
>   clients/src/main/java/org/apache/kafka/common/Node.java f4e4186 
>   
> clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 
> 129ae82 
>   clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
> c8213e1 
>   clients/src/main/java/org/apache/kafka/common/network/MultiSend.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
> fc0d168 
>   clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 
> 68327cd 
>   clients/src/main/java/org/apache/kafka/common/network/Receive.java 4e33078 
>   clients/src/main/java/org/apache/kafka/common/network/Selectable.java 
> b5f8d83 
>   clients/src/main/java/org/apache/kafka/common/network/Selector.java 57de058 
>   clients/src/main/java/org/apache/kafka/common/network/Send.java 5d321a0 
>   clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java 
> 27cbf39 
>   clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java 
> PRE-CREATION 
>   clients/src/test/java/org/apache/kafka/clients/MockClient.java 5e3fab1 
>   clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
> 8b27889 
>   clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 
> d5b306b 
>   clients/src/test/java/org/apache/kafka/test/MockSelector.java ea89b06 
>   core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 1c3b380 
>   core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala a3b1b78 
>   core/src/main/scala/kafka/api/ControlledShutdownRequest.scala fe81635 
>   core/src/main/scala/kafka/api/FetchRequest.scala b038c15 
>   core/src/main/scala/kafka/api/FetchResponse.scala 75aaf57 
>   core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala 431190a 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 317daed 
>   core/src/main/scala/kafka/api/OffsetFetchRequest.scala fa8bd6a 
>   core/src/main/scala/kafka/api/OffsetRequest.scala 3d483bc 
>   core/src/main/scala/kafka/api/ProducerRequest.scala 570b2da 
>   core/src/main/scala/kafka/api/StopReplicaRequest.scala 5e14987 
>   core/src/main/scala/kafka/api/TopicMetadataRequest.scala 363bae0 
>   core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 69f0397 
>   core/src/main/scala/kafka/client/ClientUtils.scala 62394c0 
>   core/src/main/scala/kafka/consumer/SimpleConsumer.scala 31a2639 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala aa8d940 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala 6cf13f0 
>   core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala b0b7be1 
>   core/src/main/scala/kafka/network/BlockingChannel.scala 6e2a38e 
>   core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala c0d7726 
>   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala b95b73b 
>   core/src/main/scala/kafka/network/ByteBufferSend.scala af30042 
>   core/src/main/scala/kafka/network/Handler.scala a030033 
>   core/src/main/scala/kafka/network/RequestChannel.scala 1d0024c 
>   core/src/main/scala/kafka/network/RequestOrResponseSend.scala PRE-CREATION 
>   core/src/main/scala/kafka/network/SocketServer.scala edf6214 
>   core/src/main/scala/kafka/network/Transmission.scala 2827103 
>   core/src/main/scala/kafka/producer/SyncProducer.scala 0f09951 
>   core/src/main/scala/kafka/server/KafkaApis.scala 417960d 
>   core/src/main/scala/kafka/server/KafkaServer.scala b7d2a28 
>   core/src/main/scala/kafka/server/MessageSetSend.scala 5667648 
>   core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala d2bac85 
>   core/src/test/resources/log4j.properties 1b7d5d8 
>   core/src/test/scala/other/kafka/TestOffsetManager.scala 9881bd3 
>   core/src/test/scala/unit/kafka/network/SocketServerTest.scala 95d5621 
> 
> Diff: https://reviews.apache.org/r/33065/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Gwen Shapira
> 
>

Reply via email to