----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33196/#review85920 -----------------------------------------------------------
Thanks for the explanation Ewen. I agree that a delayed scheduler would be a good fit here, but was originally more concerned about the complexity we introduced by adding two queues (one for delayed actions and another for handling order-preserving commits). After thinking about it a bit more, I feel the complexity mainly comes from the place where we need to make blocking calls, that involve also triggering the delayed tasks' poll: 1. Consumer.poll() where we will "block" until the timeout has elapsed. 2. Consumer.awaitMetadataUpdate() where we block until metadata refreshed. 3. CommitOffsetHandler.poll() where we block until this request completed via client.completeAll(). Basically we need to remember each of those places and make sure delayed tasks gets polled also while we are blocking. So I am wondering if we could refactor this patch a bit as: 1. Move DelayedTask / DelayedTaskQueue class to o.a.k.common. 2. Add the delayedTask to KafkaClient with a new API along side with send(); more specifically we can: a. Rename send() to scheduleOnce(request), which queue up the given request to be sent in the next poll"; b. Add scheduleRecurring(request, interval), which "triggers scheduleOnce every interval". c. In poll(), check whether we should schedule a request via ScheduleOnce as we did in this patch. As for the commitOffsetRequests queue and the #.retry config, if we are expecting in the future some more requests like the sync commit will be added to the consumer, we may want to make them more general, for example making them as ConsumerConfig.RETRIES_CONFIG like ProducerConfig.RETRIES_CONFIG, and Queue<RequestCompletionHandler> scheduledRequests. - Guozhang Wang On May 29, 2015, 6:11 p.m., Ewen Cheslack-Postava wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/33196/ > ----------------------------------------------------------- > > (Updated May 29, 2015, 6:11 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-2123 > https://issues.apache.org/jira/browse/KAFKA-2123 > > > Repository: kafka > > > Description > ------- > > KAFKA-2123: Add queuing of offset commit requests. > > > KAFKA-2123: Add scheduler for delayed tasks in new consumer, add backoff for > commit retries, and simplify auto commit by using delayed tasks. > > > KAFKA-2123: Make synchronous offset commits wait for previous requests to > finish in order. > > > KAFKA-2123: Remove redundant calls to ensureNotClosed > > > KAFKA-2123: Address review comments. > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java > 8f587bc0705b65b3ef37c86e0c25bb43ab8803de > > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java > bdff518b732105823058e6182f445248b45dc388 > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java > d301be4709f7b112e1f3a39f3c04cfa65f00fa60 > clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java > f50da825756938c193d7f07bee953e000e2627d9 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java > b2764df11afa7a99fce46d1ff48960d889032d14 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/common/protocol/Errors.java > 5b898c8f8ad5d0469f469b600c4b2eb13d1fc662 > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java > b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java > PRE-CREATION > core/src/test/scala/integration/kafka/api/ConsumerTest.scala > a1eed965a148eb19d9a6cefbfce131f58aaffc24 > > Diff: https://reviews.apache.org/r/33196/diff/ > > > Testing > ------- > > > Thanks, > > Ewen Cheslack-Postava > >