> On May 4, 2015, 4:51 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/api/FetchResponse.scala, lines 152-153 > > <https://reviews.apache.org/r/33378/diff/1/?file=937083#file937083line152> > > > > This is tricky since FetchRequest is used in the follower as well. When > > doing a rolling upgrade of the broker to 0.8.3, we have to follow the > > following steps. > > 1. Configure intra.cluster.protocol to 0.8.2 and rolling upgrade each > > broker to 0.8.3. After this step, each broker understands version 1 of the > > fetch request, but still sends fetch request in version 0. > > 2. Configure intra.cluster.protocol to 0.8.3 and restart each broker. > > After this step, every broker will start sending fetch request in version 1. > > > > So, we need the logic in ReplicaFetcherThread to issue the right > > version of the FetchRequest according to intra.cluster.protocol. We also > > need to read the response according to the request version (i.e., can't > > just assume the response is always on the latest version).
Good point. I'm passing in the protocol version to use on the fetchRequest to the AbstractFetcherThread. Also, not reading the response based on the request version. - Aditya ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33378/#review82389 ----------------------------------------------------------- On May 7, 2015, 1:36 a.m., Aditya Auradkar wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/33378/ > ----------------------------------------------------------- > > (Updated May 7, 2015, 1:36 a.m.) > > > Review request for kafka, Joel Koshy and Jun Rao. > > > Bugs: KAFKA-2136 > https://issues.apache.org/jira/browse/KAFKA-2136 > > > Repository: kafka > > > Description > ------- > > Changes are: > - protocol changes to the fetch reuqest and response to return the > throttle_time_ms to clients > - New producer/consumer metrics to expose the avg and max delay time for a > client > - Test cases > > For now the patch will publish a zero delay and return a response > > Added more tests > > > Addressing Jun's comments > > > Formatting changes > > > Diffs > ----- > > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java > ef9dd5238fbc771496029866ece1d85db6d7b7a5 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java > b2db91ca14bbd17fef5ce85839679144fff3f689 > clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java > 3dc8b015afd2347a41c9a9dbc02b8e367da5f75f > clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java > 8686d83aa52e435c6adafbe9ff4bd1602281072a > clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java > eb8951fba48c335095cc43fc3672de1c733e07ff > clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java > fabeae3083a8ea55cdacbb9568f3847ccd85bab4 > clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java > 37ec0b79beafcf5735c386b066eb319fb697eff5 > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java > 419541011d652becf0cda7a5e62ce813cddb1732 > > clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java > 8b1805d3d2bcb9fe2bacb37d870c3236aa9532c4 > > clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java > e3cc1967e407b64cc734548c19e30de700b64ba8 > core/src/main/scala/kafka/api/FetchRequest.scala > b038c15186c0cbcc65b59479324052498361b717 > core/src/main/scala/kafka/api/FetchResponse.scala > 75aaf57fb76ec01660d93701a57ae953d877d81c > core/src/main/scala/kafka/api/ProducerRequest.scala > 570b2da1d865086f9830aa919a49063abbbe574d > core/src/main/scala/kafka/api/ProducerResponse.scala > 5d1fac4cb8943f5bfaa487f8e9d9d2856efbd330 > core/src/main/scala/kafka/consumer/SimpleConsumer.scala > 31a2639477bf66f9a05d2b9b07794572d7ec393b > core/src/main/scala/kafka/server/AbstractFetcherThread.scala > a439046e118b6efcc3a5a9d9e8acb79f85e40398 > core/src/main/scala/kafka/server/DelayedFetch.scala > de6cf5bdaa0e70394162febc63b50b55ca0a92db > core/src/main/scala/kafka/server/DelayedProduce.scala > 05078b24ef28f2f4e099afa943e43f1d00359fda > core/src/main/scala/kafka/server/KafkaApis.scala > 417960dd1ab407ebebad8fdb0e97415db3e91a2f > core/src/main/scala/kafka/server/OffsetManager.scala > 18680ce100f10035175cc0263ba7787ab0f6a17a > core/src/main/scala/kafka/server/ReplicaFetcherThread.scala > b31b432a226ba79546dd22ef1d2acbb439c2e9a3 > core/src/main/scala/kafka/server/ReplicaManager.scala > 59c9bc3ac3a8afc07a6f8c88c5871304db588d17 > core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala > 5717165f2344823fabe8f7cfafae4bb8af2d949a > core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala > f3ab3f4ff8eb1aa6b2ab87ba75f72eceb6649620 > core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala > 00d59337a99ac135e8689bd1ecd928f7b1423d79 > > Diff: https://reviews.apache.org/r/33378/diff/ > > > Testing > ------- > > New tests added > > > Thanks, > > Aditya Auradkar > >