> On May 6, 2015, 10:50 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/common/network/Selector.java, lines 
> > 311-315
> > <https://reviews.apache.org/r/33065/diff/4/?file=947532#file947532line311>
> >
> >     In this case, we should probably just propagate the exception and 
> > potentially kill the caller, instead of continue, right?
> 
> Gwen Shapira wrote:
>     Are you saying this due the the warning or the exception itself?
>     
>     The warning is misplaced - we don't validate requests at this later. 
> Illegal state exception will be due to our own bugs mostly - such as 
> attempting to send data while another send is in progress. I think continuing 
> here is fine?
>     
>     I'm fixing the error message and adding code to handle invalid requests 
> in SocketServer. The SocketServer exception handler will basically print an 
> error, close the connection and continue.
> 
> Jun Rao wrote:
>     If we get into an IllegalStateException, the state may not be fixable. 
> So, it's probably better to let the caller deal with it. On the server side, 
> we will probably just log an error and stop the processor thread.

Thats a good point - we don't know what caused us to get to an illegal state, 
and therefore don't know if they rest of the connections for the processor are 
OK or not.

However, in the current server code, we never close processor due to errors. We 
close connections, but processors stick around. This makes sense since we never 
re-open processors, so closing the processor permenantly reduces the server's 
capacity. 

I can move handling to the SocketServer (actually, the docs say that we throw 
this exception - so I have to move it), but I still think the right handling is 
to log an error and keep trying to process other connections.


- Gwen


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review82625
-----------------------------------------------------------


On May 14, 2015, 9:51 p.m., Gwen Shapira wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/33065/
> -----------------------------------------------------------
> 
> (Updated May 14, 2015, 9:51 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: 1928 and KAFKA-1928
>     https://issues.apache.org/jira/browse/1928
>     https://issues.apache.org/jira/browse/KAFKA-1928
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> first pass on replacing Send
> 
> 
> implement maxSize and improved docs
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> KAFKA-1928-v2
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> KAFKA-1928-v2
> 
> Conflicts:
>       core/src/main/scala/kafka/network/RequestChannel.scala
> 
> moved selector out of abstract thread
> 
> 
> mid-way through putting selector in SocketServer
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> KAFKA-1928-v2
> 
> Also, SocketServer is now using Selector. Stil a bit messy - but all tests 
> pass.
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> KAFKA-1928-v2
> 
> 
> renamed requestKey to connectionId to reflect new use and changed type from 
> Any to String
> 
> 
> Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
> as well
> 
> 
> removed reify and remaining from send/recieve API, per Jun. moved 
> maybeCloseOldest() to Selector per Jay
> 
> 
> added idString to node API, changed written to int in Send API
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
> da76cc2 
>   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
> 936487b 
>   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 1311f85 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 435fbb5 
>   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
> bdff518 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> d301be4 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
>  e55ab11 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
>  ef9dd52 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> 8e336a3 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
> 187d000 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
> 1e943d6 
>   clients/src/main/java/org/apache/kafka/common/Node.java f4e4186 
>   
> clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 
> 129ae82 
>   clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
> c8213e1 
>   clients/src/main/java/org/apache/kafka/common/network/MultiSend.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
> fc0d168 
>   clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 
> 68327cd 
>   clients/src/main/java/org/apache/kafka/common/network/Receive.java 4e33078 
>   clients/src/main/java/org/apache/kafka/common/network/Selectable.java 
> b5f8d83 
>   clients/src/main/java/org/apache/kafka/common/network/Selector.java 57de058 
>   clients/src/main/java/org/apache/kafka/common/network/Send.java 5d321a0 
>   clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java 
> 27cbf39 
>   clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java 
> PRE-CREATION 
>   clients/src/test/java/org/apache/kafka/clients/MockClient.java 5e3fab1 
>   clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
> 8b27889 
>   clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 
> d5b306b 
>   clients/src/test/java/org/apache/kafka/test/MockSelector.java ea89b06 
>   core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 1c3b380 
>   core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala a3b1b78 
>   core/src/main/scala/kafka/api/ControlledShutdownRequest.scala fe81635 
>   core/src/main/scala/kafka/api/FetchRequest.scala b038c15 
>   core/src/main/scala/kafka/api/FetchResponse.scala 75aaf57 
>   core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala 431190a 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 317daed 
>   core/src/main/scala/kafka/api/OffsetFetchRequest.scala fa8bd6a 
>   core/src/main/scala/kafka/api/OffsetRequest.scala 3d483bc 
>   core/src/main/scala/kafka/api/ProducerRequest.scala 570b2da 
>   core/src/main/scala/kafka/api/StopReplicaRequest.scala 5e14987 
>   core/src/main/scala/kafka/api/TopicMetadataRequest.scala 363bae0 
>   core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 69f0397 
>   core/src/main/scala/kafka/client/ClientUtils.scala 62394c0 
>   core/src/main/scala/kafka/consumer/SimpleConsumer.scala 31a2639 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala aa8d940 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala 6cf13f0 
>   core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala b0b7be1 
>   core/src/main/scala/kafka/network/BlockingChannel.scala 6e2a38e 
>   core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala c0d7726 
>   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala b95b73b 
>   core/src/main/scala/kafka/network/ByteBufferSend.scala af30042 
>   core/src/main/scala/kafka/network/Handler.scala a030033 
>   core/src/main/scala/kafka/network/RequestChannel.scala 1d0024c 
>   core/src/main/scala/kafka/network/RequestOrResponseSend.scala PRE-CREATION 
>   core/src/main/scala/kafka/network/SocketServer.scala edf6214 
>   core/src/main/scala/kafka/network/Transmission.scala 2827103 
>   core/src/main/scala/kafka/producer/SyncProducer.scala 0f09951 
>   core/src/main/scala/kafka/server/KafkaApis.scala 417960d 
>   core/src/main/scala/kafka/server/KafkaServer.scala b7d2a28 
>   core/src/main/scala/kafka/server/MessageSetSend.scala 5667648 
>   core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala d2bac85 
>   core/src/test/resources/log4j.properties 1b7d5d8 
>   core/src/test/scala/other/kafka/TestOffsetManager.scala 9881bd3 
>   core/src/test/scala/unit/kafka/network/SocketServerTest.scala 95d5621 
> 
> Diff: https://reviews.apache.org/r/33065/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Gwen Shapira
> 
>

Reply via email to