-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33065/#review83959
-----------------------------------------------------------


Thanks for the patch. There seem to be some compilation errors. Perhaps need to 
rebase.

:core:compileScala/Users/junrao/intellij/kafka/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala:68:
 type mismatch;
 found   : kafka.network.RequestOrResponseSend
 required: kafka.network.Send
    requestChannel.sendResponse(new Response(request, new 
RequestOrResponseSend(request.connectionId, errorResponse)))
                                                      ^
/Users/junrao/intellij/kafka/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala:65:
 type mismatch;
 found   : kafka.network.RequestOrResponseSend
 required: kafka.network.Send
    requestChannel.sendResponse(new Response(request, new 
RequestOrResponseSend(request.connectionId, errorResponse)))
                                                      ^
/Users/junrao/intellij/kafka/core/src/main/scala/kafka/api/FetchRequest.scala:152:
 type mismatch;
 found   : kafka.api.FetchResponseSend
 required: kafka.network.Send
    requestChannel.sendResponse(new RequestChannel.Response(request, new 
FetchResponseSend(request.connectionId, errorResponse)))
                                                                     ^
/Users/junrao/intellij/kafka/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala:189:
 type mismatch;
 found   : kafka.network.RequestOrResponseSend
 required: kafka.network.Send
    requestChannel.sendResponse(new Response(request, new 
RequestOrResponseSend(request.connectionId, errorResponse)))
                                                      ^

/Users/junrao/intellij/kafka/core/src/main/scala/kafka/api/OffsetCommitRequest.scala:78:
 value DEFAULT_TIMESTAMP in object OffsetCommitRequest is deprecated: see 
corresponding Javadoc for more information.
            
org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP
                                                                 ^
/Users/junrao/intellij/kafka/core/src/main/scala/kafka/api/OffsetCommitRequest.scala:167:
 type mismatch;
 found   : kafka.network.RequestOrResponseSend
 required: kafka.network.Send
    requestChannel.sendResponse(new Response(request, new 
RequestOrResponseSend(request.connectionId, commitResponse)))
                                                      ^
/Users/junrao/intellij/kafka/core/src/main/scala/kafka/api/OffsetFetchRequest.scala:99:
 type mismatch;
 found   : kafka.network.RequestOrResponseSend
 required: kafka.network.Send
    requestChannel.sendResponse(new Response(request, new 
RequestOrResponseSend(request.connectionId, errorResponse)))
                                                      ^
/Users/junrao/intellij/kafka/core/src/main/scala/kafka/api/OffsetRequest.scala:121:
 type mismatch;
 found   : kafka.network.RequestOrResponseSend
 required: kafka.network.Send
    requestChannel.sendResponse(new Response(request, new 
RequestOrResponseSend(request.connectionId, errorResponse)))
                                                      ^
/Users/junrao/intellij/kafka/core/src/main/scala/kafka/network/RequestChannel.scala:29:
 imported `Send' is permanently hidden by definition of trait Send in package 
network
import org.apache.kafka.common.network.Send


clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
<https://reviews.apache.org/r/33065/#comment135068>

    We don't need this since the idle conenctions only need to be closed on the 
server side.



clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
<https://reviews.apache.org/r/33065/#comment135072>

    We don't need this since the idle conenctions only need to be closed on the 
server side.



clients/src/main/java/org/apache/kafka/common/network/MultiSend.java
<https://reviews.apache.org/r/33065/#comment135073>

    Actually, with MultiSend, we will be sending a 4-byte size plus the 
payload, the sum of which could be a bit larger than max_int. So, I think we 
need to make writeTo and size return long instead in Send. Sorry for the 
incorrect suggestion earlier.



clients/src/main/java/org/apache/kafka/common/network/MultiSend.java
<https://reviews.apache.org/r/33065/#comment135074>

    To be consistent, no need to wrap single line statement in {}.



clients/src/main/java/org/apache/kafka/common/network/Selector.java
<https://reviews.apache.org/r/33065/#comment135059>

    It seems that we don't really need IdenityHashMap to optimize performance. 
In the following link, HashMap gives comparable performance as IdenityHashMap 
on String keys since String caches the hashcode.
    http://java-performance.info/java-util-identityhashmap/



clients/src/main/java/org/apache/kafka/common/network/Selector.java
<https://reviews.apache.org/r/33065/#comment135056>

    We only need to close idle connections on the server side. So, we can use 
MAX_LONG for connectionMaxIdleMs for the clients. But we need to see if 
maybeCloseOldestConnection handles overflows well. Alternatively, we can just 
not maintain the lruConnection on the client side.



core/src/main/scala/kafka/api/FetchResponse.scala
<https://reviews.apache.org/r/33065/#comment135077>

    sends can be private.



core/src/main/scala/kafka/api/FetchResponse.scala
<https://reviews.apache.org/r/33065/#comment135078>

    sends can be private.



core/src/main/scala/kafka/network/SocketServer.scala
<https://reviews.apache.org/r/33065/#comment135015>

    Our convention is to use () for methods with side effects. So, we should do 
wakeup(). Same is true for shutdownHook.



core/src/main/scala/kafka/network/SocketServer.scala
<https://reviews.apache.org/r/33065/#comment135044>

    It seems that we can just call wakeup() and get rid of shutdownHook.



core/src/main/scala/kafka/network/SocketServer.scala
<https://reviews.apache.org/r/33065/#comment135052>

    I guess we will make use of this in a followup patch?



core/src/main/scala/kafka/network/SocketServer.scala
<https://reviews.apache.org/r/33065/#comment135016>

    This is not enough to get the thread out of the loop. Also, we probably 
don't want to continue with the rest of the logic. The easiest way is to 
rethrow the exception to kill the thread.



core/src/main/scala/kafka/network/SocketServer.scala
<https://reviews.apache.org/r/33065/#comment135020>

    Actually, if the incoming request is invalid, the exception will be thrown 
here. So, we should handle the exception here. We probably should just handle 
Exception, instead of Throwable.



core/src/main/scala/kafka/network/SocketServer.scala
<https://reviews.apache.org/r/33065/#comment135019>

    Do we need to close again? We may hit the same exception as before. Also, 
see the comment above.



core/src/test/resources/log4j.properties
<https://reviews.apache.org/r/33065/#comment135080>

    These are probably not intended.


- Jun Rao


On May 15, 2015, 7:30 a.m., Gwen Shapira wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/33065/
> -----------------------------------------------------------
> 
> (Updated May 15, 2015, 7:30 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: 1928 and KAFKA-1928
>     https://issues.apache.org/jira/browse/1928
>     https://issues.apache.org/jira/browse/KAFKA-1928
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> first pass on replacing Send
> 
> 
> implement maxSize and improved docs
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> KAFKA-1928-v2
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> KAFKA-1928-v2
> 
> Conflicts:
>       core/src/main/scala/kafka/network/RequestChannel.scala
> 
> moved selector out of abstract thread
> 
> 
> mid-way through putting selector in SocketServer
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> KAFKA-1928-v2
> 
> Also, SocketServer is now using Selector. Stil a bit messy - but all tests 
> pass.
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> KAFKA-1928-v2
> 
> 
> renamed requestKey to connectionId to reflect new use and changed type from 
> Any to String
> 
> 
> Following Jun's comments - moved MultiSend to client. Cleaned up destinations 
> as well
> 
> 
> removed reify and remaining from send/recieve API, per Jun. moved 
> maybeCloseOldest() to Selector per Jay
> 
> 
> added idString to node API, changed written to int in Send API
> 
> 
> cleaning up MultiSend, added size() to Send interface
> 
> 
> fixed some issues with multisend
> 
> 
> Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
> KAFKA-1928-v2
> 
> 
> fixed metric thingies
> 
> 
> fixed response order bug
> 
> 
> error handling for illegal selector state and fix metrics bug
> 
> 
> optimized selection key lookup with identity hash
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
> da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 
>   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
> 936487b16e7ac566f8bdcd39a7240ceb619fd30e 
>   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
> 1311f85847b022efec8cb05c450bb18231db6979 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
> 435fbb5116e80302eba11ed1d3069cb577dbdcbd 
>   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
> bdff518b732105823058e6182f445248b45dc388 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
>  e55ab11df4db0b0084f841a74cbcf819caf780d5 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
>  ef9dd5238fbc771496029866ece1d85db6d7b7a5 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> 8e336a3aa96c73f52beaeb56b931baf4b026cf21 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
> 187d0004c8c46b6664ddaffecc6166d4b47351e5 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
> 1e943d621732889a1c005b243920dc32cea7af66 
>   clients/src/main/java/org/apache/kafka/common/Node.java 
> f4e4186c7602787e58e304a2f1c293a633114656 
>   
> clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 
> 129ae827bccbd982ad93d56e46c6f5c46f147fe0 
>   clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java 
> c8213e156ec9c9af49ee09f5238492318516aaa3 
>   clients/src/main/java/org/apache/kafka/common/network/MultiSend.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
> fc0d168324aaebb97065b0aafbd547a1994d76a7 
>   clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 
> 68327cd3a734fd429966d3e2016a2488dbbb19e5 
>   clients/src/main/java/org/apache/kafka/common/network/Receive.java 
> 4e33078c1eec834bd74aabcb5fc69f18c9d6d52a 
>   clients/src/main/java/org/apache/kafka/common/network/Selectable.java 
> b5f8d83e89f9026dc0853e5f92c00b2d7f043e22 
>   clients/src/main/java/org/apache/kafka/common/network/Selector.java 
> 57de0585e5e9a53eb9dcd99cac1ab3eb2086a302 
>   clients/src/main/java/org/apache/kafka/common/network/Send.java 
> 5d321a09e470166a1c33639cf0cab26a3bce98ec 
>   clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java 
> 27cbf390c7f148ffa8c5abc154c72cbf0829715c 
>   clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java 
> PRE-CREATION 
>   clients/src/test/java/org/apache/kafka/clients/MockClient.java 
> 5e3fab13e3c02eb351558ec973b949b3d1196085 
>   clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
> 8b278892883e63899b53e15efb9d8c926131e858 
>   clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 
> d5b306b026e788b4e5479f3419805aa49ae889f3 
>   clients/src/test/java/org/apache/kafka/test/MockSelector.java 
> ea89b06a4c9e5bb351201299cd3037f5226f0e6c 
>   core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 
> 1c3b3802ac221d570e7610458e50518b4499e7ed 
>   core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala 
> a3b1b78adb760eaeb029466b54f335a29caf3b0f 
>   core/src/main/scala/kafka/api/ControlledShutdownRequest.scala 
> fe81635c864cec03ca1d4681c9c47c3fc4f975ee 
>   core/src/main/scala/kafka/api/FetchRequest.scala 
> b038c15186c0cbcc65b59479324052498361b717 
>   core/src/main/scala/kafka/api/FetchResponse.scala 
> 75aaf57fb76ec01660d93701a57ae953d877d81c 
>   core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala 
> 431190ab94afc4acfc14348a1fc720e17c071cea 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
> 317daed18db8b02635927d81fbcad623f137de5e 
>   core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
> fa8bd6a145fd3f08a5f78fcfa813ed7417ccffd2 
>   core/src/main/scala/kafka/api/OffsetRequest.scala 
> 3d483bc7518ad76f9548772522751afb4d046b78 
>   core/src/main/scala/kafka/api/ProducerRequest.scala 
> 570b2da1d865086f9830aa919a49063abbbe574d 
>   core/src/main/scala/kafka/api/StopReplicaRequest.scala 
> 5e14987c990fe561c01dac2909f5ed21a506e038 
>   core/src/main/scala/kafka/api/TopicMetadataRequest.scala 
> 363bae01752318f3849242b97a6619747697c1d9 
>   core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 
> 69f0397b187a737b4ddf50e390d3c2f418ce6b5d 
>   core/src/main/scala/kafka/client/ClientUtils.scala 
> 62394c0d3813f19a443cf862c8bc6c5808be9f88 
>   core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
> 31a2639477bf66f9a05d2b9b07794572d7ec393b 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
> aa8d9404a3e78a365df06404b79d0d8f694b4bd6 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
> 6cf13f0a1f7f31ff9367197a435e0ae4427b6438 
>   core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala 
> b0b7be14d494ae8c87f4443b52db69d273c20316 
>   core/src/main/scala/kafka/network/BlockingChannel.scala 
> 6e2a38eee8e568f9032f95c75fa5899e9715b433 
>   core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala 
> c0d77261353478232ab85591c182be57845b3f13 
>   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
> b95b73b71252932867c60192b3d5b91efe99e122 
>   core/src/main/scala/kafka/network/ByteBufferSend.scala 
> af30042a4c713418ecd83b6c6c17dfcbdc101c62 
>   core/src/main/scala/kafka/network/Handler.scala 
> a0300336b8cb5a2d5be68b7b48bdbe045bf99324 
>   core/src/main/scala/kafka/network/RequestChannel.scala 
> 1d0024c8f0c2ab0efa6d8cfca6455877a6ed8026 
>   core/src/main/scala/kafka/network/RequestOrResponseSend.scala PRE-CREATION 
>   core/src/main/scala/kafka/network/SocketServer.scala 
> edf6214278935c031cf493d72d266e715d43dd06 
>   core/src/main/scala/kafka/network/Transmission.scala 
> 2827103d7e57789bb04859bdeb9d4720c8bd060c 
>   core/src/main/scala/kafka/producer/SyncProducer.scala 
> 0f09951329a8a8f86bd4d1512e8d10eb151ddb43 
>   core/src/main/scala/kafka/server/KafkaApis.scala 
> 417960dd1ab407ebebad8fdb0e97415db3e91a2f 
>   core/src/main/scala/kafka/server/KafkaServer.scala 
> b7d2a2842e17411a823b93bdedc84657cbd62be1 
>   core/src/main/scala/kafka/server/MessageSetSend.scala 
> 566764850cc60b9d35a4b51abd89a8109f340f5d 
>   core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala 
> d2bac85e16a247b1326f63619711fb0bbbd2e82a 
>   core/src/test/resources/log4j.properties 
> 1b7d5d8f7d5fae7d272849715714781cad05d77b 
>   core/src/test/scala/other/kafka/TestOffsetManager.scala 
> 9881bd3dff0591f315bd53aea96d3c6e12a24cb6 
>   core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
> 95d562134c0414ddc3caaa1e1defeb246f585b0b 
> 
> Diff: https://reviews.apache.org/r/33065/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Gwen Shapira
> 
>

Reply via email to