-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review82625
-----------------------------------------------------------


Thanks for the patch. Looks great overall! Some comments below.

Also, there seems to be compilation error when running tests.


clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
<https://reviews.apache.org/r/33065/#comment133525>

    Instead of doing the id to string conversion every time, perhaps we can 
compute the string of the id once in Node and expose it through an idString() 
method?



clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
<https://reviews.apache.org/r/33065/#comment133526>

    Perhaps changing nodeId to node.



clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
<https://reviews.apache.org/r/33065/#comment133527>

    Perhaps changing id to node?



clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java
<https://reviews.apache.org/r/33065/#comment133529>

    Since we always use a 4 byte size, perhaps we should make both remaining() 
and writeTo() return int?



clients/src/main/java/org/apache/kafka/common/network/MultiSend.java
<https://reviews.apache.org/r/33065/#comment133564>

    Not sure why we need to pass in expectedBytesToWrite. This can be computed 
from the sends.



clients/src/main/java/org/apache/kafka/common/network/MultiSend.java
<https://reviews.apache.org/r/33065/#comment133532>

    Instead of moving back the cursor in the iterator, would it be simpler to 
maintain a current Send that's being iterated on?



clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
<https://reviews.apache.org/r/33065/#comment133534>

    request size probably should be renamed to receive size since it can be for 
either request or response.



clients/src/main/java/org/apache/kafka/common/network/Selector.java
<https://reviews.apache.org/r/33065/#comment133512>

    There is a followup patch in KAFKA-1282 that we haven't incorporated. 
Bascially, the linked hash map needs to be in access order. Since we are moving 
code around, could you incorporate the change here?



clients/src/main/java/org/apache/kafka/common/network/Selector.java
<https://reviews.apache.org/r/33065/#comment133537>

    Instead of -1, perhaps we can reuse NetworkReceive.UNLIMITED?



clients/src/main/java/org/apache/kafka/common/network/Selector.java
<https://reviews.apache.org/r/33065/#comment133541>

    Could we just iterate the keySet directly instead of making a copy first?



clients/src/main/java/org/apache/kafka/common/network/Selector.java
<https://reviews.apache.org/r/33065/#comment133543>

    It's reasonable to have a size for Send. The server size only records sent 
bytes, instead of messages.



clients/src/main/java/org/apache/kafka/common/network/Selector.java
<https://reviews.apache.org/r/33065/#comment133544>

    In this case, we should probably just propagate the exception and 
potentially kill the caller, instead of continue, right?



clients/src/main/java/org/apache/kafka/common/network/Selector.java
<https://reviews.apache.org/r/33065/#comment133548>

    Perhaps we could pass in connectionMaxIdleNanos to the constructor of 
Selector. Then this method can be private. For clients, we can default to max 
long so that they could close idle connections since the broker is doing that. 
The broker can pass in the connectionMaxIdleNanos from config.



core/src/main/scala/kafka/network/RequestChannel.scala
<https://reviews.apache.org/r/33065/#comment133506>

    We probably don't need remoteAddress any more since that's part of 
connectionId now.



core/src/main/scala/kafka/network/SocketServer.scala
<https://reviews.apache.org/r/33065/#comment133395>

    Could we define both methods as override?



core/src/main/scala/kafka/network/SocketServer.scala
<https://reviews.apache.org/r/33065/#comment133567>

    Currently, Selector exposes metrics per connection. While this works fine 
for producer/consumer clients since the number of brokers is typically small, I 
am concerned about having those on the server since there could be 10s of 
thousands of connections in a broker.
    
    Perhaps we can pass in a config to disable per node metric in Selector when 
used in the broker.
    
    We probably should think through if there is any metrics in Selector that 
we want to expose in Coda Hale metrics.



core/src/main/scala/kafka/network/SocketServer.scala
<https://reviews.apache.org/r/33065/#comment133500>

    It seems that we don't need startSelectTime. We will need to expose the 
io-wait-ratio metric in selector to networkProcessor.IdlePercent. This can be 
done in a followup jira though if the patch is too big.



core/src/main/scala/kafka/network/SocketServer.scala
<https://reviews.apache.org/r/33065/#comment133495>

    No need for return value.



core/src/main/scala/kafka/network/SocketServer.scala
<https://reviews.apache.org/r/33065/#comment133507>

    Since this will be called on every request, I am wondering how much 
overhead this will add. selector.mute() will do an id to key lookup. There is 
overhead in computing the hash from a string id. I am wondering if we can 
optimize that by making the id to key map in Selector as an identity map. This 
can also be done in a followup jira, if needed.



core/src/main/scala/kafka/network/SocketServer.scala
<https://reviews.apache.org/r/33065/#comment133510>

    This is just registering for reads, not for writes, right?



core/src/main/scala/kafka/network/SocketServer.scala
<https://reviews.apache.org/r/33065/#comment133511>

    Do we need this to be info?



core/src/main/scala/kafka/producer/SyncProducer.scala
<https://reviews.apache.org/r/33065/#comment133523>

    Our convention is to not use () for methods that don't have side effect. 
So, just use response.payload.


- Jun Rao


On May 1, 2015, 10:45 p.m., Gwen Shapira wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/33065/
> -----------------------------------------------------------
> 
> (Updated May 1, 2015, 10:45 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1928
>     https://issues.apache.org/jira/browse/KAFKA-1928
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> first pass on replacing Send
> 
> 
> implement maxSize and improved docs
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> KAFKA-1928-v2
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> KAFKA-1928-v2
> 
> Conflicts:
>       core/src/main/scala/kafka/network/RequestChannel.scala
> 
> moved selector out of abstract thread
> 
> 
> mid-way through putting selector in SocketServer
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> KAFKA-1928-v2
> 
> Also, SocketServer is now using Selector. Stil a bit messy - but all tests 
> pass.
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> KAFKA-1928-v2
> 
> 
> renamed requestKey to connectionId to reflect new use and changed type from 
> Any to String
> 
> 
> Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
> as well
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
> da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
>   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
> 936487b16e7ac566f8bdcd39a7240ceb619fd30e 
>   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
> 1311f85847b022efec8cb05c450bb18231db6979 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
> b7ae595f2cc46e5dfe728bc3ce6082e9cd0b6d36 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
>  e55ab11df4db0b0084f841a74cbcf819caf780d5 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
>  ef9dd5238fbc771496029866ece1d85db6d7b7a5 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
> b2db91ca14bbd17fef5ce85839679144fff3f689 
>   
> clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 
> 129ae827bccbd982ad93d56e46c6f5c46f147fe0 
>   clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
> c8213e156ec9c9af49ee09f5238492318516aaa3 
>   clients/src/main/java/org/apache/kafka/common/network/MultiSend.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
> fc0d168324aaebb97065b0aafbd547a1994d76a7 
>   clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 
> 68327cd3a734fd429966d3e2016a2488dbbb19e5 
>   clients/src/main/java/org/apache/kafka/common/network/Receive.java 
> 4e33078c1eec834bd74aabcb5fc69f18c9d6d52a 
>   clients/src/main/java/org/apache/kafka/common/network/Selectable.java 
> b5f8d83e89f9026dc0853e5f92c00b2d7f043e22 
>   clients/src/main/java/org/apache/kafka/common/network/Selector.java 
> 57de0585e5e9a53eb9dcd99cac1ab3eb2086a302 
>   clients/src/main/java/org/apache/kafka/common/network/Send.java 
> 5d321a09e470166a1c33639cf0cab26a3bce98ec 
>   clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java 
> 27cbf390c7f148ffa8c5abc154c72cbf0829715c 
>   clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java 
> PRE-CREATION 
>   clients/src/test/java/org/apache/kafka/clients/MockClient.java 
> 5e3fab13e3c02eb351558ec973b949b3d1196085 
>   clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
> 8b278892883e63899b53e15efb9d8c926131e858 
>   clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 
> d5b306b026e788b4e5479f3419805aa49ae889f3 
>   clients/src/test/java/org/apache/kafka/test/MockSelector.java 
> ea89b06a4c9e5bb351201299cd3037f5226f0e6c 
>   core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 
> 1c3b3802ac221d570e7610458e50518b4499e7ed 
>   core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala 
> a3b1b78adb760eaeb029466b54f335a29caf3b0f 
>   core/src/main/scala/kafka/api/ControlledShutdownRequest.scala 
> fe81635c864cec03ca1d4681c9c47c3fc4f975ee 
>   core/src/main/scala/kafka/api/FetchRequest.scala 
> b038c15186c0cbcc65b59479324052498361b717 
>   core/src/main/scala/kafka/api/FetchResponse.scala 
> 75aaf57fb76ec01660d93701a57ae953d877d81c 
>   core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala 
> 431190ab94afc4acfc14348a1fc720e17c071cea 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
> cf8e6acc426aef6eb19d862bf6a108a5fc37907a 
>   core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
> 67811a752a470bf9bdbc8c5419e8d6e20a006169 
>   core/src/main/scala/kafka/api/OffsetRequest.scala 
> 3d483bc7518ad76f9548772522751afb4d046b78 
>   core/src/main/scala/kafka/api/ProducerRequest.scala 
> 570b2da1d865086f9830aa919a49063abbbe574d 
>   core/src/main/scala/kafka/api/StopReplicaRequest.scala 
> 5e14987c990fe561c01dac2909f5ed21a506e038 
>   core/src/main/scala/kafka/api/TopicMetadataRequest.scala 
> 363bae01752318f3849242b97a6619747697c1d9 
>   core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 
> 69f0397b187a737b4ddf50e390d3c2f418ce6b5d 
>   core/src/main/scala/kafka/client/ClientUtils.scala 
> 62394c0d3813f19a443cf862c8bc6c5808be9f88 
>   core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
> cbef84ac76e62768981f74e71d451f2bda995275 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
> aa8d9404a3e78a365df06404b79d0d8f694b4bd6 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
> 6cf13f0a1f7f31ff9367197a435e0ae4427b6438 
>   core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala 
> b0b7be14d494ae8c87f4443b52db69d273c20316 
>   core/src/main/scala/kafka/network/BlockingChannel.scala 
> 6e2a38eee8e568f9032f95c75fa5899e9715b433 
>   core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala 
> c0d77261353478232ab85591c182be57845b3f13 
>   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
> b95b73b71252932867c60192b3d5b91efe99e122 
>   core/src/main/scala/kafka/network/ByteBufferSend.scala 
> af30042a4c713418ecd83b6c6c17dfcbdc101c62 
>   core/src/main/scala/kafka/network/Handler.scala 
> a0300336b8cb5a2d5be68b7b48bdbe045bf99324 
>   core/src/main/scala/kafka/network/RequestChannel.scala 
> 1d0024c8f0c2ab0efa6d8cfca6455877a6ed8026 
>   core/src/main/scala/kafka/network/RequestOrResponseSend.scala PRE-CREATION 
>   core/src/main/scala/kafka/network/SocketServer.scala 
> b9bedde336abf0f0ca623810d6720f4c5307c280 
>   core/src/main/scala/kafka/network/Transmission.scala 
> 2827103d7e57789bb04859bdeb9d4720c8bd060c 
>   core/src/main/scala/kafka/producer/SyncProducer.scala 
> 0f09951329a8a8f86bd4d1512e8d10eb151ddb43 
>   core/src/main/scala/kafka/server/KafkaApis.scala 
> b4004aa3a1456d337199aa1245fb0ae61f6add46 
>   core/src/main/scala/kafka/server/KafkaServer.scala 
> b7d2a2842e17411a823b93bdedc84657cbd62be1 
>   core/src/main/scala/kafka/server/MessageSetSend.scala 
> 566764850cc60b9d35a4b51abd89a8109f340f5d 
>   core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala 
> d2bac85e16a247b1326f63619711fb0bbbd2e82a 
>   core/src/test/resources/log4j.properties 
> 1b7d5d8f7d5fae7d272849715714781cad05d77b 
>   core/src/test/scala/other/kafka/TestOffsetManager.scala 
> 9881bd3dff0591f315bd53aea96d3c6e12a24cb6 
>   core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
> 95d562134c0414ddc3caaa1e1defeb246f585b0b 
> 
> Diff: https://reviews.apache.org/r/33065/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Gwen Shapira
> 
>

Reply via email to