> On May 6, 2015, 10:50 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/common/network/Selector.java, lines 
> > 311-315
> > <https://reviews.apache.org/r/33065/diff/4/?file=947532#file947532line311>
> >
> >     In this case, we should probably just propagate the exception and 
> > potentially kill the caller, instead of continue, right?
> 
> Gwen Shapira wrote:
>     Are you saying this due the the warning or the exception itself?
>     
>     The warning is misplaced - we don't validate requests at this later. 
> Illegal state exception will be due to our own bugs mostly - such as 
> attempting to send data while another send is in progress. I think continuing 
> here is fine?
>     
>     I'm fixing the error message and adding code to handle invalid requests 
> in SocketServer. The SocketServer exception handler will basically print an 
> error, close the connection and continue.

If we get into an IllegalStateException, the state may not be fixable. So, it's 
probably better to let the caller deal with it. On the server side, we will 
probably just log an error and stop the processor thread.


- Jun


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review82625
-----------------------------------------------------------


On May 12, 2015, 9:58 a.m., Gwen Shapira wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/33065/
> -----------------------------------------------------------
> 
> (Updated May 12, 2015, 9:58 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: 1928 and KAFKA-1928
>     https://issues.apache.org/jira/browse/1928
>     https://issues.apache.org/jira/browse/KAFKA-1928
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> first pass on replacing Send
> 
> 
> implement maxSize and improved docs
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> KAFKA-1928-v2
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> KAFKA-1928-v2
> 
> Conflicts:
>       core/src/main/scala/kafka/network/RequestChannel.scala
> 
> moved selector out of abstract thread
> 
> 
> mid-way through putting selector in SocketServer
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> KAFKA-1928-v2
> 
> Also, SocketServer is now using Selector. Stil a bit messy - but all tests 
> pass.
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> KAFKA-1928-v2
> 
> 
> renamed requestKey to connectionId to reflect new use and changed type from 
> Any to String
> 
> 
> Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
> as well
> 
> 
> removed reify and remaining from send/recieve API, per Jun. moved 
> maybeCloseOldest() to Selector per Jay
> 
> 
> added idString to node API, changed written to int in Send API
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
> da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
>   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
> 936487b16e7ac566f8bdcd39a7240ceb619fd30e 
>   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
> 1311f85847b022efec8cb05c450bb18231db6979 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
> b7ae595f2cc46e5dfe728bc3ce6082e9cd0b6d36 
>   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
> bdff518b732105823058e6182f445248b45dc388 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
>  e55ab11df4db0b0084f841a74cbcf819caf780d5 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
>  ef9dd5238fbc771496029866ece1d85db6d7b7a5 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> 42b12928781463b56fc4a45d96bb4da2745b6d95 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
> 187d0004c8c46b6664ddaffecc6166d4b47351e5 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
> b2db91ca14bbd17fef5ce85839679144fff3f689 
>   clients/src/main/java/org/apache/kafka/common/Node.java 
> f4e4186c7602787e58e304a2f1c293a633114656 
>   
> clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 
> 129ae827bccbd982ad93d56e46c6f5c46f147fe0 
>   clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
> c8213e156ec9c9af49ee09f5238492318516aaa3 
>   clients/src/main/java/org/apache/kafka/common/network/MultiSend.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
> fc0d168324aaebb97065b0aafbd547a1994d76a7 
>   clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 
> 68327cd3a734fd429966d3e2016a2488dbbb19e5 
>   clients/src/main/java/org/apache/kafka/common/network/Receive.java 
> 4e33078c1eec834bd74aabcb5fc69f18c9d6d52a 
>   clients/src/main/java/org/apache/kafka/common/network/Selectable.java 
> b5f8d83e89f9026dc0853e5f92c00b2d7f043e22 
>   clients/src/main/java/org/apache/kafka/common/network/Selector.java 
> 57de0585e5e9a53eb9dcd99cac1ab3eb2086a302 
>   clients/src/main/java/org/apache/kafka/common/network/Send.java 
> 5d321a09e470166a1c33639cf0cab26a3bce98ec 
>   clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java 
> 27cbf390c7f148ffa8c5abc154c72cbf0829715c 
>   clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java 
> PRE-CREATION 
>   clients/src/test/java/org/apache/kafka/clients/MockClient.java 
> 5e3fab13e3c02eb351558ec973b949b3d1196085 
>   clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
> 8b278892883e63899b53e15efb9d8c926131e858 
>   clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 
> d5b306b026e788b4e5479f3419805aa49ae889f3 
>   clients/src/test/java/org/apache/kafka/test/MockSelector.java 
> ea89b06a4c9e5bb351201299cd3037f5226f0e6c 
>   core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 
> 1c3b3802ac221d570e7610458e50518b4499e7ed 
>   core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala 
> a3b1b78adb760eaeb029466b54f335a29caf3b0f 
>   core/src/main/scala/kafka/api/ControlledShutdownRequest.scala 
> fe81635c864cec03ca1d4681c9c47c3fc4f975ee 
>   core/src/main/scala/kafka/api/FetchRequest.scala 
> b038c15186c0cbcc65b59479324052498361b717 
>   core/src/main/scala/kafka/api/FetchResponse.scala 
> 75aaf57fb76ec01660d93701a57ae953d877d81c 
>   core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala 
> 431190ab94afc4acfc14348a1fc720e17c071cea 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
> cf8e6acc426aef6eb19d862bf6a108a5fc37907a 
>   core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
> 67811a752a470bf9bdbc8c5419e8d6e20a006169 
>   core/src/main/scala/kafka/api/OffsetRequest.scala 
> 3d483bc7518ad76f9548772522751afb4d046b78 
>   core/src/main/scala/kafka/api/ProducerRequest.scala 
> 570b2da1d865086f9830aa919a49063abbbe574d 
>   core/src/main/scala/kafka/api/StopReplicaRequest.scala 
> 5e14987c990fe561c01dac2909f5ed21a506e038 
>   core/src/main/scala/kafka/api/TopicMetadataRequest.scala 
> 363bae01752318f3849242b97a6619747697c1d9 
>   core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 
> 69f0397b187a737b4ddf50e390d3c2f418ce6b5d 
>   core/src/main/scala/kafka/client/ClientUtils.scala 
> 62394c0d3813f19a443cf862c8bc6c5808be9f88 
>   core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
> cbef84ac76e62768981f74e71d451f2bda995275 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
> aa8d9404a3e78a365df06404b79d0d8f694b4bd6 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
> 6cf13f0a1f7f31ff9367197a435e0ae4427b6438 
>   core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala 
> b0b7be14d494ae8c87f4443b52db69d273c20316 
>   core/src/main/scala/kafka/network/BlockingChannel.scala 
> 6e2a38eee8e568f9032f95c75fa5899e9715b433 
>   core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala 
> c0d77261353478232ab85591c182be57845b3f13 
>   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
> b95b73b71252932867c60192b3d5b91efe99e122 
>   core/src/main/scala/kafka/network/ByteBufferSend.scala 
> af30042a4c713418ecd83b6c6c17dfcbdc101c62 
>   core/src/main/scala/kafka/network/Handler.scala 
> a0300336b8cb5a2d5be68b7b48bdbe045bf99324 
>   core/src/main/scala/kafka/network/RequestChannel.scala 
> 1d0024c8f0c2ab0efa6d8cfca6455877a6ed8026 
>   core/src/main/scala/kafka/network/RequestOrResponseSend.scala PRE-CREATION 
>   core/src/main/scala/kafka/network/SocketServer.scala 
> b9bedde336abf0f0ca623810d6720f4c5307c280 
>   core/src/main/scala/kafka/network/Transmission.scala 
> 2827103d7e57789bb04859bdeb9d4720c8bd060c 
>   core/src/main/scala/kafka/producer/SyncProducer.scala 
> 0f09951329a8a8f86bd4d1512e8d10eb151ddb43 
>   core/src/main/scala/kafka/server/KafkaApis.scala 
> b4004aa3a1456d337199aa1245fb0ae61f6add46 
>   core/src/main/scala/kafka/server/KafkaServer.scala 
> b7d2a2842e17411a823b93bdedc84657cbd62be1 
>   core/src/main/scala/kafka/server/MessageSetSend.scala 
> 566764850cc60b9d35a4b51abd89a8109f340f5d 
>   core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala 
> d2bac85e16a247b1326f63619711fb0bbbd2e82a 
>   core/src/test/resources/log4j.properties 
> 1b7d5d8f7d5fae7d272849715714781cad05d77b 
>   core/src/test/scala/other/kafka/TestOffsetManager.scala 
> 9881bd3dff0591f315bd53aea96d3c6e12a24cb6 
>   core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
> 95d562134c0414ddc3caaa1e1defeb246f585b0b 
> 
> Diff: https://reviews.apache.org/r/33065/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Gwen Shapira
> 
>

Reply via email to