----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/#review91012 -----------------------------------------------------------
Browsed through the patch, overall looks very promising. I am not very clear on a few detailed changes though: 1. The request future adapter / handler modifications. 2. Retry backoff implementation seems not correct. Could you explain a little bit on these two aspects? clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java (line 27) <https://reviews.apache.org/r/36333/#comment144213> You may want to add the committed offset map in the callback since otherwise it is unclear which commit it is referring to when triggered. clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java (lines 542 - 545) <https://reviews.apache.org/r/36333/#comment144216> Is there any particular reason you want to materialize the sessionTimeoutMs variable? It seems only referred once at line 545. clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java (line 765) <https://reviews.apache.org/r/36333/#comment144219> I think KAFKA-1894 is already fixed in this patch + KAFKA-2168? clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java (line 786) <https://reviews.apache.org/r/36333/#comment144233> Add some comments: "re-schedule the commit task for the next commit interval"? clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java (lines 88 - 93) <https://reviews.apache.org/r/36333/#comment144252> This is not introduced in the patch, but I am not sure if this is the right way to respect backoff time. For example, if the destination broker is down for a short period of time, poll(retryBackoffMs) will immediately return, and hence this function will busy triggering poll() and fluding the network with metadata requests right? What we want in this case, is that the consumer should wait for retryBackoffMs before retry sending the next metadata request. clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java (line 119) <https://reviews.apache.org/r/36333/#comment144250> This function can be private. clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java (lines 135 - 138) <https://reviews.apache.org/r/36333/#comment144253> Same as above in awaitMetadataUpdate(). clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java (line 114) <https://reviews.apache.org/r/36333/#comment144275> We can remove this line since it is checked inside ensureAssignment() already. clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java (line 137) <https://reviews.apache.org/r/36333/#comment144239> Renaming to "ensurePartitionAssigned"? clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java (line 212) <https://reviews.apache.org/r/36333/#comment144276> There is a potential risk of not aligning the scheduling of heartbeat with the discovery of the coordinator. For example, let's say: 1. at t0 we call initHeartbeatTask with interval 100; 2. at t1 the consumer already find the coordinator, but it will not send the first HB until t100; 3. at t100 the consumer may find itself already been kicked out of the group by the coordinator, and reschedule at t200 and re-join group. 4. at t101 the consumer has re-joined the group, but will not send the HB until t200, and so on .. clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java (line 280) <https://reviews.apache.org/r/36333/#comment144277> Why we change the behavior to directly throw exception here? clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java (line 22) <https://reviews.apache.org/r/36333/#comment144246> This comment is not correct since the function returns void. clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java (lines 1 - 43) <https://reviews.apache.org/r/36333/#comment144278> Not clear why you want to convert the future type in this adapter, can you elaborate a bit? - Guozhang Wang On July 8, 2015, 9:19 p.m., Jason Gustafson wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/36333/ > ----------------------------------------------------------- > > (Updated July 8, 2015, 9:19 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-2123 > https://issues.apache.org/jira/browse/KAFKA-2123 > > > Repository: kafka > > > Description > ------- > > KAFKA-2123; resolve problems from rebase > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java > fd98740bff175cc9d5bc02e365d88e011ef65d22 > > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java > eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java > 7aa076084c894bb8f47b9df2c086475b06f47060 > clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java > 46e26a665a22625d50888efa7b53472279f36e79 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java > c1c8172cd45f6715262f9a6f497a7b1797a834a3 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java > 695eaf63db9a5fa20dc2ca68957901462a96cd96 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java > 51eae1944d5c17cf838be57adf560bafe36fbfbd > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java > 13fc9af7392b4ade958daf3b0c9a165ddda351a6 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java > 683745304c671952ff566f23b5dd4cf3ab75377a > > clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/common/protocol/Errors.java > 4c0ecc3badd99727b5bd9d430364e61c184e0923 > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java > PRE-CREATION > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java > d085fe5c9e2a0567893508a1c71f014fae6d7510 > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java > PRE-CREATION > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java > 405efdc7a59438731cbc3630876bda0042a3adb3 > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java > ee1ede01efa070409b86f5d8874cd578e058ce51 > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java > PRE-CREATION > core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala > 476973b2c551db5be3f1c54f94990f0dd15ff65e > core/src/test/scala/integration/kafka/api/ConsumerTest.scala > 92ffb91b5e039dc0d4cd0e072ca46db32f280cf9 > > Diff: https://reviews.apache.org/r/36333/diff/ > > > Testing > ------- > > > Thanks, > > Jason Gustafson > >