> On July 9, 2015, 3:53 a.m., Ewen Cheslack-Postava wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java,
> >  line 91
> > <https://reviews.apache.org/r/36333/diff/1/?file=1002923#file1002923line91>
> >
> >     Is the retryBackoffMs here because that's how long you want to wait for 
> > a response before we jump back out of the poll, which fails the request and 
> > triggers another NetworkClient.poll() call, which in turn sends the 
> > MetadataRequest?
> >     
> >     Just trying to figure out the flow because a) the flow is unclear and 
> > b) this is the only use of metadata in this class. Would this make sense to 
> > push into NetworkClient, which is the one responsible for making the 
> > request anyway? I'm not sure it's a good idea since nothing besides this 
> > class uses NetworkClient anymore, but worth thinking about.
> Jason Gustafson wrote:
>     Guozhang mentioned this code as well, which was copied from 
> KafkaConsumer. I think the use of retryBackoffMs is incorrect. It probably 
> could be poll(-1) since we are not actually resending any requests.
>     To me, the weird thing has always been that metadata updates are handled 
> directly in NetworkClient and not in the higher layers. I can see the reason 
> for it, but it makes it difficult to attach code to metadata updates (which 
> is exactly what we need to implement regex subscriptions). I would personally 
> be in favor of pulling it out of NetworkClient, but that has implications for 
> KafkaProducer as well. Moving this method into NetworkClient is probably a 
> better idea at the moment and may give us a fighting chance to get the logic 
> right.

When implementing the new producer, people feel it is better letting 
NetworkClient handles metadata responses specifically aside from other 
responses so that the high-level module (Sender for producer, and "whatever the 
module for consumer" at that time) does not need to handle it. I think this 
motivation is still valid even as today, but we did not thought through the 
regex subscriptions at that time..

> On July 9, 2015, 3:53 a.m., Ewen Cheslack-Postava wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java,
> >  line 121
> > <https://reviews.apache.org/r/36333/diff/1/?file=1002923#file1002923line121>
> >
> >     Is this the behavior we want? Both timeout and 
> > delayedTasks.nextTimeout() can be arbitrarily small. For delayedTasks 
> > especially, it seems like we're tying the failure of requests to unrelated 
> > events?
> >     
> >     Previously, I thought request failures may happen quickly, but then 
> > there was a Utils.sleep backoff. I am not seeing how that is handled now? 
> > If the connection isn't established, won't poll() not send out requests, 
> > run client.poll(), possibly return very quickly before the connection is 
> > established/fails, then fail the unsent requests, then just return to the 
> > caller? And that caller might be one of those while() loops, so it may just 
> > continue to retry, busy looping while not actually accomplishing anything.
> >     
> >     If we're going to introduce queuing of send requests here, it seems 
> > like a timeout (rather than fast fail + backoff) might be a more natural 
> > solution. So rather than clearUnsentRequests, it might be 
> > clearExpiredRequests.
> Jason Gustafson wrote:
>     This is not clear from the comments, but the main reason for the unsent 
> request queue in this class is to get around NetworkClient's single-send 
> behavior. Currently, NetworkClient will only accept one send at a time for a 
> given node. If you have multiple requests, you have to sprinkle in a bunch of 
> poll() calls and use ready() to make sure that they can all get buffered. 
> This is annoying because you cannot attach a callback to a request until you 
> are actually able to send it. This means you always have to send requests in 
> a poll loop, which implies you can't really have asynchronous requests. The 
> unsent request queue provides a way to buffer these requests before 
> transporting to the network layer which allows us to give a future to the 
> caller immediately. 
>     Now, when the user calls poll, we try to send all of the unsent requests 
> immediately (by doing the ready()/poll() dance until nothing else can be 
> sent). Note that none of this depends on the timeout passed to poll: we are 
> just calling poll(0) until nothing can be sent. After that, then we call 
> poll(timeout). It may be the case that there were some requests that failed 
> to be sent. This could be because the client is still connecting to the node 
> or that its send buffer is full. To make the behavior of this class easy to 
> understand, I just fail these requests which means that the unsent request 
> queue never accumulates beyond a poll call. So every request sent either gets 
> transported or fails.
>     I think the call to Utils.sleep(retryBackoffMs) was lost in the refactor. 
> It should probably be invoked whenever we retry a request to the coordinator. 
> One potential issue is that the configuration "retry.backoff.ms" (as its 
> currently documented) is only intended for fetch requests, but we have been 
> using it for all requests. Perhaps there should be a separate configuration? 
> For example "coordinator.retry.backoff.ms"? Or perhaps a hard-coded value is 
> really what we want.

Regarding the retry backoff timeout: in producer the RETRY_BACKOFF_MS_CONFIG is 
used for both produce and metadata requests. I suggest that for consumer we use 
the session.time to "guide" coordinator.retry.backoff, more concretely:

1. still use retry.backoff.ms for fetch / metadata / consumer-metadata requests.
2. currently the heartbeat interval is hard-coded as 1/3 of the session 
timeout, we can similarly hard-code the backoff of join-group / offset-commit / 
offset-fetch requests to be, say, 1/6 of the session timeout?

> On July 9, 2015, 3:53 a.m., Ewen Cheslack-Postava wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java,
> >  line 314
> > <https://reviews.apache.org/r/36333/diff/1/?file=1002924#file1002924line314>
> >
> >     Why trigger commit refresh here?
> Jason Gustafson wrote:
>     I copied this from older versions of the code, but I've wondered the 
> same. It does ensure that we will not reset any partitions to an older 
> position before the commit request returns. It also ensures that committed() 
> will not return before a pending asynchronous commit. However, in both of 
> those cases, I think the alternative behavior would be reasonable as well. In 
> the end, I kept it in there because I didn't have a strong case against it 
> and because it didn't seem like a bad thing to refresh commits from the 
> server from time to time. Now that I think about it a little more, it 
> actually makes asynchronous commits more costly than synchronous commits 
> since they always result in an additional FetchCommittedOffsets request.

I feel it is OK either way for committed() / position() APIs, i.e. we could 
define "async commit does or does NOT guarantee subsequent committed() / 
position() calls to return the committed offsets even if it succeeded". 

However I think we should define "async commit DOES guarantee the subsequent 
poll() from a re-started / rebalanced consumer will start from the committed 
offsets if it suceeded". 

In other words, I think updateFetchPositions in pollOnce should check if there 
is a commit in progress, and if yes fetch the committed positions; while for 
committed() and position() it is really about how we want to define their 

- Guozhang

This is an automatically generated e-mail. To reply, visit:

On July 8, 2015, 9:19 p.m., Jason Gustafson wrote:
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36333/
> -----------------------------------------------------------
> (Updated July 8, 2015, 9:19 p.m.)
> Review request for kafka.
> Bugs: KAFKA-2123
>     https://issues.apache.org/jira/browse/KAFKA-2123
> Repository: kafka
> Description
> -------
> KAFKA-2123; resolve problems from rebase
> Diffs
> -----
>   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
> fd98740bff175cc9d5bc02e365d88e011ef65d22 
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
> eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> 7aa076084c894bb8f47b9df2c086475b06f47060 
>   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
> 46e26a665a22625d50888efa7b53472279f36e79 
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
>  c1c8172cd45f6715262f9a6f497a7b1797a834a3 
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
>  695eaf63db9a5fa20dc2ca68957901462a96cd96 
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
>  51eae1944d5c17cf838be57adf560bafe36fbfbd 
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
>  13fc9af7392b4ade958daf3b0c9a165ddda351a6 
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
>  683745304c671952ff566f23b5dd4cf3ab75377a 
> clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java
> clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java 
> clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java
> clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java
> clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java
> clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java
>   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
> 4c0ecc3badd99727b5bd9d430364e61c184e0923 
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
>  d085fe5c9e2a0567893508a1c71f014fae6d7510 
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
>  405efdc7a59438731cbc3630876bda0042a3adb3 
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
>  ee1ede01efa070409b86f5d8874cd578e058ce51 
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java
>   core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 
> 476973b2c551db5be3f1c54f94990f0dd15ff65e 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
> 92ffb91b5e039dc0d4cd0e072ca46db32f280cf9 
> Diff: https://reviews.apache.org/r/36333/diff/
> Testing
> -------
> Thanks,
> Jason Gustafson

Reply via email to