> On May 31, 2015, 9:10 p.m., Guozhang Wang wrote:
> > Thanks for the explanation Ewen. I agree that a delayed scheduler would be 
> > a good fit here, but was originally more concerned about the complexity we 
> > introduced by adding two queues (one for delayed actions and another for 
> > handling order-preserving commits). After thinking about it a bit more, I 
> > feel the complexity mainly comes from the place where we need to make 
> > blocking calls, that involve also triggering the delayed tasks' poll:
> > 
> > 
> > 1. Consumer.poll() where we will "block" until the timeout has elapsed.
> > 2. Consumer.awaitMetadataUpdate() where we block until metadata refreshed.
> > 3. CommitOffsetHandler.poll() where we block until this request completed 
> > via client.completeAll().
> > 
> > Basically we need to remember each of those places and make sure delayed 
> > tasks gets polled also while we are blocking. So I am wondering if we could 
> > refactor this patch a bit as:
> > 
> > 1. Move DelayedTask / DelayedTaskQueue class to o.a.k.common.
> > 
> > 2. Add the delayedTask to KafkaClient with a new API along side with 
> > send(); more specifically we can:
> >   a. Rename send() to scheduleOnce(request), which queue up the given 
> > request to be sent in the next poll";
> >   b. Add scheduleRecurring(request, interval), which "triggers scheduleOnce 
> > every interval".
> >   c. In poll(), check whether we should schedule a request via ScheduleOnce 
> > as we did in this patch.
> > 
> > As for the commitOffsetRequests queue and the #.retry config, if we are 
> > expecting in the future some more requests like the sync commit will be 
> > added to the consumer, we may want to make them more general, for example 
> > making them as ConsumerConfig.RETRIES_CONFIG like 
> > ProducerConfig.RETRIES_CONFIG, and Queue<RequestCompletionHandler> 
> > scheduledRequests.
> 
> Guozhang Wang wrote:
>     Also I am curious how KAFKA-2168 will be leveraging this patch to add the 
> wakeup call, could you elaborate a bit?

My patch for KAFKA-2168 (if accepted) may actually make this simpler since it 
centralizes all of the blocking calls in KafkaConsumer. Then it might not be 
necessary to push the task queue into the networking layer. 

I wonder, however, if we can do without the task queue by just letting 
asynchronous commit requests fail fast? If the consumer is using auto-commits, 
then we will retry the commit again at the next interval anyway. And if the 
user is the one doing the commit and they actually care about the result, then 
we provide a callback which will force them to handle errors regardless of 
whether we are retrying underneath the covers. (There's also just the fact that 
having to deal with a queue of pending commits feels odd given that it's really 
only the most recent ones that you care about.)


- Jason


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33196/#review85920
-----------------------------------------------------------


On May 29, 2015, 6:11 p.m., Ewen Cheslack-Postava wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/33196/
> -----------------------------------------------------------
> 
> (Updated May 29, 2015, 6:11 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2123
>     https://issues.apache.org/jira/browse/KAFKA-2123
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-2123: Add queuing of offset commit requests.
> 
> 
> KAFKA-2123: Add scheduler for delayed tasks in new consumer, add backoff for 
> commit retries, and simplify auto commit by using delayed tasks.
> 
> 
> KAFKA-2123: Make synchronous offset commits wait for previous requests to 
> finish in order.
> 
> 
> KAFKA-2123: Remove redundant calls to ensureNotClosed
> 
> 
> KAFKA-2123: Address review comments.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
> 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
> bdff518b732105823058e6182f445248b45dc388 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
>   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
> f50da825756938c193d7f07bee953e000e2627d9 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
>  b2764df11afa7a99fce46d1ff48960d889032d14 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
> 5b898c8f8ad5d0469f469b600c4b2eb13d1fc662 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
>  b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java
>  PRE-CREATION 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
> a1eed965a148eb19d9a6cefbfce131f58aaffc24 
> 
> Diff: https://reviews.apache.org/r/33196/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Ewen Cheslack-Postava
> 
>

Reply via email to