> On May 6, 2015, 10:50 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/network/SocketServer.scala, line 455 > > <https://reviews.apache.org/r/33065/diff/4/?file=947565#file947565line455> > > > > This is just registering for reads, not for writes, right?
Maybe I misunderstood, but it looks like selector.send registers for writing? > On May 6, 2015, 10:50 p.m., Jun Rao wrote: > > clients/src/main/java/org/apache/kafka/common/network/Selector.java, lines > > 311-315 > > <https://reviews.apache.org/r/33065/diff/4/?file=947532#file947532line311> > > > > In this case, we should probably just propagate the exception and > > potentially kill the caller, instead of continue, right? Are you saying this due the the warning or the exception itself? The warning is misplaced - we don't validate requests at this later. Illegal state exception will be due to our own bugs mostly - such as attempting to send data while another send is in progress. I think continuing here is fine? I'm fixing the error message and adding code to handle invalid requests in SocketServer. The SocketServer exception handler will basically print an error, close the connection and continue. > On May 6, 2015, 10:50 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/network/SocketServer.scala, lines 367-373 > > <https://reviews.apache.org/r/33065/diff/4/?file=947565#file947565line367> > > > > Currently, Selector exposes metrics per connection. While this works > > fine for producer/consumer clients since the number of brokers is typically > > small, I am concerned about having those on the server since there could be > > 10s of thousands of connections in a broker. > > > > Perhaps we can pass in a config to disable per node metric in Selector > > when used in the broker. > > > > We probably should think through if there is any metrics in Selector > > that we want to expose in Coda Hale metrics. This makes sense. Actually, the current code has metrics per Processor. I think we can go with a single Metrics object for all processors and tag each processor. It will be nice to have a tag for protocol too, but I don't see how we can do it with each processor handling all protocols. - Gwen ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33065/#review82625 ----------------------------------------------------------- On May 12, 2015, 9:58 a.m., Gwen Shapira wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/33065/ > ----------------------------------------------------------- > > (Updated May 12, 2015, 9:58 a.m.) > > > Review request for kafka. > > > Bugs: 1928 and KAFKA-1928 > https://issues.apache.org/jira/browse/1928 > https://issues.apache.org/jira/browse/KAFKA-1928 > > > Repository: kafka > > > Description > ------- > > first pass on replacing Send > > > implement maxSize and improved docs > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > KAFKA-1928-v2 > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > KAFKA-1928-v2 > > Conflicts: > core/src/main/scala/kafka/network/RequestChannel.scala > > moved selector out of abstract thread > > > mid-way through putting selector in SocketServer > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > KAFKA-1928-v2 > > Also, SocketServer is now using Selector. Stil a bit messy - but all tests > pass. > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > KAFKA-1928-v2 > > > renamed requestKey to connectionId to reflect new use and changed type from > Any to String > > > Following Jun's comments - moved MultiSend to client. Cleaned up destinations > as well > > > removed reify and remaining from send/recieve API, per Jun. moved > maybeCloseOldest() to Selector per Jay > > > added idString to node API, changed written to int in Send API > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java > da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 > clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java > 936487b16e7ac566f8bdcd39a7240ceb619fd30e > clients/src/main/java/org/apache/kafka/clients/KafkaClient.java > 1311f85847b022efec8cb05c450bb18231db6979 > clients/src/main/java/org/apache/kafka/clients/NetworkClient.java > b7ae595f2cc46e5dfe728bc3ce6082e9cd0b6d36 > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java > bdff518b732105823058e6182f445248b45dc388 > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java > d301be4709f7b112e1f3a39f3c04cfa65f00fa60 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java > e55ab11df4db0b0084f841a74cbcf819caf780d5 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java > ef9dd5238fbc771496029866ece1d85db6d7b7a5 > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java > 42b12928781463b56fc4a45d96bb4da2745b6d95 > clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java > 187d0004c8c46b6664ddaffecc6166d4b47351e5 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java > b2db91ca14bbd17fef5ce85839679144fff3f689 > clients/src/main/java/org/apache/kafka/common/Node.java > f4e4186c7602787e58e304a2f1c293a633114656 > > clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java > 129ae827bccbd982ad93d56e46c6f5c46f147fe0 > clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java > c8213e156ec9c9af49ee09f5238492318516aaa3 > clients/src/main/java/org/apache/kafka/common/network/MultiSend.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java > fc0d168324aaebb97065b0aafbd547a1994d76a7 > clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java > 68327cd3a734fd429966d3e2016a2488dbbb19e5 > clients/src/main/java/org/apache/kafka/common/network/Receive.java > 4e33078c1eec834bd74aabcb5fc69f18c9d6d52a > clients/src/main/java/org/apache/kafka/common/network/Selectable.java > b5f8d83e89f9026dc0853e5f92c00b2d7f043e22 > clients/src/main/java/org/apache/kafka/common/network/Selector.java > 57de0585e5e9a53eb9dcd99cac1ab3eb2086a302 > clients/src/main/java/org/apache/kafka/common/network/Send.java > 5d321a09e470166a1c33639cf0cab26a3bce98ec > clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java > 27cbf390c7f148ffa8c5abc154c72cbf0829715c > clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java > PRE-CREATION > clients/src/test/java/org/apache/kafka/clients/MockClient.java > 5e3fab13e3c02eb351558ec973b949b3d1196085 > clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java > 8b278892883e63899b53e15efb9d8c926131e858 > clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java > d5b306b026e788b4e5479f3419805aa49ae889f3 > clients/src/test/java/org/apache/kafka/test/MockSelector.java > ea89b06a4c9e5bb351201299cd3037f5226f0e6c > core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala > 1c3b3802ac221d570e7610458e50518b4499e7ed > core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala > a3b1b78adb760eaeb029466b54f335a29caf3b0f > core/src/main/scala/kafka/api/ControlledShutdownRequest.scala > fe81635c864cec03ca1d4681c9c47c3fc4f975ee > core/src/main/scala/kafka/api/FetchRequest.scala > b038c15186c0cbcc65b59479324052498361b717 > core/src/main/scala/kafka/api/FetchResponse.scala > 75aaf57fb76ec01660d93701a57ae953d877d81c > core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala > 431190ab94afc4acfc14348a1fc720e17c071cea > core/src/main/scala/kafka/api/OffsetCommitRequest.scala > cf8e6acc426aef6eb19d862bf6a108a5fc37907a > core/src/main/scala/kafka/api/OffsetFetchRequest.scala > 67811a752a470bf9bdbc8c5419e8d6e20a006169 > core/src/main/scala/kafka/api/OffsetRequest.scala > 3d483bc7518ad76f9548772522751afb4d046b78 > core/src/main/scala/kafka/api/ProducerRequest.scala > 570b2da1d865086f9830aa919a49063abbbe574d > core/src/main/scala/kafka/api/StopReplicaRequest.scala > 5e14987c990fe561c01dac2909f5ed21a506e038 > core/src/main/scala/kafka/api/TopicMetadataRequest.scala > 363bae01752318f3849242b97a6619747697c1d9 > core/src/main/scala/kafka/api/UpdateMetadataRequest.scala > 69f0397b187a737b4ddf50e390d3c2f418ce6b5d > core/src/main/scala/kafka/client/ClientUtils.scala > 62394c0d3813f19a443cf862c8bc6c5808be9f88 > core/src/main/scala/kafka/consumer/SimpleConsumer.scala > cbef84ac76e62768981f74e71d451f2bda995275 > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala > aa8d9404a3e78a365df06404b79d0d8f694b4bd6 > core/src/main/scala/kafka/controller/ControllerChannelManager.scala > 6cf13f0a1f7f31ff9367197a435e0ae4427b6438 > core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala > b0b7be14d494ae8c87f4443b52db69d273c20316 > core/src/main/scala/kafka/network/BlockingChannel.scala > 6e2a38eee8e568f9032f95c75fa5899e9715b433 > core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala > c0d77261353478232ab85591c182be57845b3f13 > core/src/main/scala/kafka/network/BoundedByteBufferSend.scala > b95b73b71252932867c60192b3d5b91efe99e122 > core/src/main/scala/kafka/network/ByteBufferSend.scala > af30042a4c713418ecd83b6c6c17dfcbdc101c62 > core/src/main/scala/kafka/network/Handler.scala > a0300336b8cb5a2d5be68b7b48bdbe045bf99324 > core/src/main/scala/kafka/network/RequestChannel.scala > 1d0024c8f0c2ab0efa6d8cfca6455877a6ed8026 > core/src/main/scala/kafka/network/RequestOrResponseSend.scala PRE-CREATION > core/src/main/scala/kafka/network/SocketServer.scala > b9bedde336abf0f0ca623810d6720f4c5307c280 > core/src/main/scala/kafka/network/Transmission.scala > 2827103d7e57789bb04859bdeb9d4720c8bd060c > core/src/main/scala/kafka/producer/SyncProducer.scala > 0f09951329a8a8f86bd4d1512e8d10eb151ddb43 > core/src/main/scala/kafka/server/KafkaApis.scala > b4004aa3a1456d337199aa1245fb0ae61f6add46 > core/src/main/scala/kafka/server/KafkaServer.scala > b7d2a2842e17411a823b93bdedc84657cbd62be1 > core/src/main/scala/kafka/server/MessageSetSend.scala > 566764850cc60b9d35a4b51abd89a8109f340f5d > core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala > d2bac85e16a247b1326f63619711fb0bbbd2e82a > core/src/test/resources/log4j.properties > 1b7d5d8f7d5fae7d272849715714781cad05d77b > core/src/test/scala/other/kafka/TestOffsetManager.scala > 9881bd3dff0591f315bd53aea96d3c6e12a24cb6 > core/src/test/scala/unit/kafka/network/SocketServerTest.scala > 95d562134c0414ddc3caaa1e1defeb246f585b0b > > Diff: https://reviews.apache.org/r/33065/diff/ > > > Testing > ------- > > > Thanks, > > Gwen Shapira > >