----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33065/#review83959 -----------------------------------------------------------
Thanks for the patch. There seem to be some compilation errors. Perhaps need to rebase. :core:compileScala/Users/junrao/intellij/kafka/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala:68: type mismatch; found : kafka.network.RequestOrResponseSend required: kafka.network.Send requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) ^ /Users/junrao/intellij/kafka/core/src/main/scala/kafka/api/ControlledShutdownRequest.scala:65: type mismatch; found : kafka.network.RequestOrResponseSend required: kafka.network.Send requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) ^ /Users/junrao/intellij/kafka/core/src/main/scala/kafka/api/FetchRequest.scala:152: type mismatch; found : kafka.api.FetchResponseSend required: kafka.network.Send requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, errorResponse))) ^ /Users/junrao/intellij/kafka/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala:189: type mismatch; found : kafka.network.RequestOrResponseSend required: kafka.network.Send requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) ^ /Users/junrao/intellij/kafka/core/src/main/scala/kafka/api/OffsetCommitRequest.scala:78: value DEFAULT_TIMESTAMP in object OffsetCommitRequest is deprecated: see corresponding Javadoc for more information. org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP ^ /Users/junrao/intellij/kafka/core/src/main/scala/kafka/api/OffsetCommitRequest.scala:167: type mismatch; found : kafka.network.RequestOrResponseSend required: kafka.network.Send requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, commitResponse))) ^ /Users/junrao/intellij/kafka/core/src/main/scala/kafka/api/OffsetFetchRequest.scala:99: type mismatch; found : kafka.network.RequestOrResponseSend required: kafka.network.Send requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) ^ /Users/junrao/intellij/kafka/core/src/main/scala/kafka/api/OffsetRequest.scala:121: type mismatch; found : kafka.network.RequestOrResponseSend required: kafka.network.Send requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse))) ^ /Users/junrao/intellij/kafka/core/src/main/scala/kafka/network/RequestChannel.scala:29: imported `Send' is permanently hidden by definition of trait Send in package network import org.apache.kafka.common.network.Send clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java <https://reviews.apache.org/r/33065/#comment135068> We don't need this since the idle conenctions only need to be closed on the server side. clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java <https://reviews.apache.org/r/33065/#comment135072> We don't need this since the idle conenctions only need to be closed on the server side. clients/src/main/java/org/apache/kafka/common/network/MultiSend.java <https://reviews.apache.org/r/33065/#comment135073> Actually, with MultiSend, we will be sending a 4-byte size plus the payload, the sum of which could be a bit larger than max_int. So, I think we need to make writeTo and size return long instead in Send. Sorry for the incorrect suggestion earlier. clients/src/main/java/org/apache/kafka/common/network/MultiSend.java <https://reviews.apache.org/r/33065/#comment135074> To be consistent, no need to wrap single line statement in {}. clients/src/main/java/org/apache/kafka/common/network/Selector.java <https://reviews.apache.org/r/33065/#comment135059> It seems that we don't really need IdenityHashMap to optimize performance. In the following link, HashMap gives comparable performance as IdenityHashMap on String keys since String caches the hashcode. http://java-performance.info/java-util-identityhashmap/ clients/src/main/java/org/apache/kafka/common/network/Selector.java <https://reviews.apache.org/r/33065/#comment135056> We only need to close idle connections on the server side. So, we can use MAX_LONG for connectionMaxIdleMs for the clients. But we need to see if maybeCloseOldestConnection handles overflows well. Alternatively, we can just not maintain the lruConnection on the client side. core/src/main/scala/kafka/api/FetchResponse.scala <https://reviews.apache.org/r/33065/#comment135077> sends can be private. core/src/main/scala/kafka/api/FetchResponse.scala <https://reviews.apache.org/r/33065/#comment135078> sends can be private. core/src/main/scala/kafka/network/SocketServer.scala <https://reviews.apache.org/r/33065/#comment135015> Our convention is to use () for methods with side effects. So, we should do wakeup(). Same is true for shutdownHook. core/src/main/scala/kafka/network/SocketServer.scala <https://reviews.apache.org/r/33065/#comment135044> It seems that we can just call wakeup() and get rid of shutdownHook. core/src/main/scala/kafka/network/SocketServer.scala <https://reviews.apache.org/r/33065/#comment135052> I guess we will make use of this in a followup patch? core/src/main/scala/kafka/network/SocketServer.scala <https://reviews.apache.org/r/33065/#comment135016> This is not enough to get the thread out of the loop. Also, we probably don't want to continue with the rest of the logic. The easiest way is to rethrow the exception to kill the thread. core/src/main/scala/kafka/network/SocketServer.scala <https://reviews.apache.org/r/33065/#comment135020> Actually, if the incoming request is invalid, the exception will be thrown here. So, we should handle the exception here. We probably should just handle Exception, instead of Throwable. core/src/main/scala/kafka/network/SocketServer.scala <https://reviews.apache.org/r/33065/#comment135019> Do we need to close again? We may hit the same exception as before. Also, see the comment above. core/src/test/resources/log4j.properties <https://reviews.apache.org/r/33065/#comment135080> These are probably not intended. - Jun Rao On May 15, 2015, 7:30 a.m., Gwen Shapira wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/33065/ > ----------------------------------------------------------- > > (Updated May 15, 2015, 7:30 a.m.) > > > Review request for kafka. > > > Bugs: 1928 and KAFKA-1928 > https://issues.apache.org/jira/browse/1928 > https://issues.apache.org/jira/browse/KAFKA-1928 > > > Repository: kafka > > > Description > ------- > > first pass on replacing Send > > > implement maxSize and improved docs > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > KAFKA-1928-v2 > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > KAFKA-1928-v2 > > Conflicts: > core/src/main/scala/kafka/network/RequestChannel.scala > > moved selector out of abstract thread > > > mid-way through putting selector in SocketServer > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > KAFKA-1928-v2 > > Also, SocketServer is now using Selector. Stil a bit messy - but all tests > pass. > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > KAFKA-1928-v2 > > > renamed requestKey to connectionId to reflect new use and changed type from > Any to String > > > Following Jun's comments - moved MultiSend to client. Cleaned up destinations > as well > > > removed reify and remaining from send/recieve API, per Jun. moved > maybeCloseOldest() to Selector per Jay > > > added idString to node API, changed written to int in Send API > > > cleaning up MultiSend, added size() to Send interface > > > fixed some issues with multisend > > > Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into > KAFKA-1928-v2 > > > fixed metric thingies > > > fixed response order bug > > > error handling for illegal selector state and fix metrics bug > > > optimized selection key lookup with identity hash > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java > da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 > clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java > 936487b16e7ac566f8bdcd39a7240ceb619fd30e > clients/src/main/java/org/apache/kafka/clients/KafkaClient.java > 1311f85847b022efec8cb05c450bb18231db6979 > clients/src/main/java/org/apache/kafka/clients/NetworkClient.java > 435fbb5116e80302eba11ed1d3069cb577dbdcbd > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java > bdff518b732105823058e6182f445248b45dc388 > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java > d301be4709f7b112e1f3a39f3c04cfa65f00fa60 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java > e55ab11df4db0b0084f841a74cbcf819caf780d5 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java > ef9dd5238fbc771496029866ece1d85db6d7b7a5 > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java > 8e336a3aa96c73f52beaeb56b931baf4b026cf21 > clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java > 187d0004c8c46b6664ddaffecc6166d4b47351e5 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java > 1e943d621732889a1c005b243920dc32cea7af66 > clients/src/main/java/org/apache/kafka/common/Node.java > f4e4186c7602787e58e304a2f1c293a633114656 > > clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java > 129ae827bccbd982ad93d56e46c6f5c46f147fe0 > clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java > c8213e156ec9c9af49ee09f5238492318516aaa3 > clients/src/main/java/org/apache/kafka/common/network/MultiSend.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java > fc0d168324aaebb97065b0aafbd547a1994d76a7 > clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java > 68327cd3a734fd429966d3e2016a2488dbbb19e5 > clients/src/main/java/org/apache/kafka/common/network/Receive.java > 4e33078c1eec834bd74aabcb5fc69f18c9d6d52a > clients/src/main/java/org/apache/kafka/common/network/Selectable.java > b5f8d83e89f9026dc0853e5f92c00b2d7f043e22 > clients/src/main/java/org/apache/kafka/common/network/Selector.java > 57de0585e5e9a53eb9dcd99cac1ab3eb2086a302 > clients/src/main/java/org/apache/kafka/common/network/Send.java > 5d321a09e470166a1c33639cf0cab26a3bce98ec > clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java > 27cbf390c7f148ffa8c5abc154c72cbf0829715c > clients/src/main/java/org/apache/kafka/common/requests/ResponseSend.java > PRE-CREATION > clients/src/test/java/org/apache/kafka/clients/MockClient.java > 5e3fab13e3c02eb351558ec973b949b3d1196085 > clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java > 8b278892883e63899b53e15efb9d8c926131e858 > clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java > d5b306b026e788b4e5479f3419805aa49ae889f3 > clients/src/test/java/org/apache/kafka/test/MockSelector.java > ea89b06a4c9e5bb351201299cd3037f5226f0e6c > core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala > 1c3b3802ac221d570e7610458e50518b4499e7ed > core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala > a3b1b78adb760eaeb029466b54f335a29caf3b0f > core/src/main/scala/kafka/api/ControlledShutdownRequest.scala > fe81635c864cec03ca1d4681c9c47c3fc4f975ee > core/src/main/scala/kafka/api/FetchRequest.scala > b038c15186c0cbcc65b59479324052498361b717 > core/src/main/scala/kafka/api/FetchResponse.scala > 75aaf57fb76ec01660d93701a57ae953d877d81c > core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala > 431190ab94afc4acfc14348a1fc720e17c071cea > core/src/main/scala/kafka/api/OffsetCommitRequest.scala > 317daed18db8b02635927d81fbcad623f137de5e > core/src/main/scala/kafka/api/OffsetFetchRequest.scala > fa8bd6a145fd3f08a5f78fcfa813ed7417ccffd2 > core/src/main/scala/kafka/api/OffsetRequest.scala > 3d483bc7518ad76f9548772522751afb4d046b78 > core/src/main/scala/kafka/api/ProducerRequest.scala > 570b2da1d865086f9830aa919a49063abbbe574d > core/src/main/scala/kafka/api/StopReplicaRequest.scala > 5e14987c990fe561c01dac2909f5ed21a506e038 > core/src/main/scala/kafka/api/TopicMetadataRequest.scala > 363bae01752318f3849242b97a6619747697c1d9 > core/src/main/scala/kafka/api/UpdateMetadataRequest.scala > 69f0397b187a737b4ddf50e390d3c2f418ce6b5d > core/src/main/scala/kafka/client/ClientUtils.scala > 62394c0d3813f19a443cf862c8bc6c5808be9f88 > core/src/main/scala/kafka/consumer/SimpleConsumer.scala > 31a2639477bf66f9a05d2b9b07794572d7ec393b > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala > aa8d9404a3e78a365df06404b79d0d8f694b4bd6 > core/src/main/scala/kafka/controller/ControllerChannelManager.scala > 6cf13f0a1f7f31ff9367197a435e0ae4427b6438 > core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala > b0b7be14d494ae8c87f4443b52db69d273c20316 > core/src/main/scala/kafka/network/BlockingChannel.scala > 6e2a38eee8e568f9032f95c75fa5899e9715b433 > core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala > c0d77261353478232ab85591c182be57845b3f13 > core/src/main/scala/kafka/network/BoundedByteBufferSend.scala > b95b73b71252932867c60192b3d5b91efe99e122 > core/src/main/scala/kafka/network/ByteBufferSend.scala > af30042a4c713418ecd83b6c6c17dfcbdc101c62 > core/src/main/scala/kafka/network/Handler.scala > a0300336b8cb5a2d5be68b7b48bdbe045bf99324 > core/src/main/scala/kafka/network/RequestChannel.scala > 1d0024c8f0c2ab0efa6d8cfca6455877a6ed8026 > core/src/main/scala/kafka/network/RequestOrResponseSend.scala PRE-CREATION > core/src/main/scala/kafka/network/SocketServer.scala > edf6214278935c031cf493d72d266e715d43dd06 > core/src/main/scala/kafka/network/Transmission.scala > 2827103d7e57789bb04859bdeb9d4720c8bd060c > core/src/main/scala/kafka/producer/SyncProducer.scala > 0f09951329a8a8f86bd4d1512e8d10eb151ddb43 > core/src/main/scala/kafka/server/KafkaApis.scala > 417960dd1ab407ebebad8fdb0e97415db3e91a2f > core/src/main/scala/kafka/server/KafkaServer.scala > b7d2a2842e17411a823b93bdedc84657cbd62be1 > core/src/main/scala/kafka/server/MessageSetSend.scala > 566764850cc60b9d35a4b51abd89a8109f340f5d > core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala > d2bac85e16a247b1326f63619711fb0bbbd2e82a > core/src/test/resources/log4j.properties > 1b7d5d8f7d5fae7d272849715714781cad05d77b > core/src/test/scala/other/kafka/TestOffsetManager.scala > 9881bd3dff0591f315bd53aea96d3c6e12a24cb6 > core/src/test/scala/unit/kafka/network/SocketServerTest.scala > 95d562134c0414ddc3caaa1e1defeb246f585b0b > > Diff: https://reviews.apache.org/r/33065/diff/ > > > Testing > ------- > > > Thanks, > > Gwen Shapira > >