> On July 9, 2015, 3:53 a.m., Ewen Cheslack-Postava wrote: > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java, > > line 91 > > <https://reviews.apache.org/r/36333/diff/1/?file=1002923#file1002923line91> > > > > Is the retryBackoffMs here because that's how long you want to wait for > > a response before we jump back out of the poll, which fails the request and > > triggers another NetworkClient.poll() call, which in turn sends the > > MetadataRequest? > > > > Just trying to figure out the flow because a) the flow is unclear and > > b) this is the only use of metadata in this class. Would this make sense to > > push into NetworkClient, which is the one responsible for making the > > request anyway? I'm not sure it's a good idea since nothing besides this > > class uses NetworkClient anymore, but worth thinking about. > > Jason Gustafson wrote: > Guozhang mentioned this code as well, which was copied from > KafkaConsumer. I think the use of retryBackoffMs is incorrect. It probably > could be poll(-1) since we are not actually resending any requests. > > To me, the weird thing has always been that metadata updates are handled > directly in NetworkClient and not in the higher layers. I can see the reason > for it, but it makes it difficult to attach code to metadata updates (which > is exactly what we need to implement regex subscriptions). I would personally > be in favor of pulling it out of NetworkClient, but that has implications for > KafkaProducer as well. Moving this method into NetworkClient is probably a > better idea at the moment and may give us a fighting chance to get the logic > right.
When implementing the new producer, people feel it is better letting NetworkClient handles metadata responses specifically aside from other responses so that the high-level module (Sender for producer, and "whatever the module for consumer" at that time) does not need to handle it. I think this motivation is still valid even as today, but we did not thought through the regex subscriptions at that time.. > On July 9, 2015, 3:53 a.m., Ewen Cheslack-Postava wrote: > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java, > > line 121 > > <https://reviews.apache.org/r/36333/diff/1/?file=1002923#file1002923line121> > > > > Is this the behavior we want? Both timeout and > > delayedTasks.nextTimeout() can be arbitrarily small. For delayedTasks > > especially, it seems like we're tying the failure of requests to unrelated > > events? > > > > Previously, I thought request failures may happen quickly, but then > > there was a Utils.sleep backoff. I am not seeing how that is handled now? > > If the connection isn't established, won't poll() not send out requests, > > run client.poll(), possibly return very quickly before the connection is > > established/fails, then fail the unsent requests, then just return to the > > caller? And that caller might be one of those while() loops, so it may just > > continue to retry, busy looping while not actually accomplishing anything. > > > > If we're going to introduce queuing of send requests here, it seems > > like a timeout (rather than fast fail + backoff) might be a more natural > > solution. So rather than clearUnsentRequests, it might be > > clearExpiredRequests. > > Jason Gustafson wrote: > This is not clear from the comments, but the main reason for the unsent > request queue in this class is to get around NetworkClient's single-send > behavior. Currently, NetworkClient will only accept one send at a time for a > given node. If you have multiple requests, you have to sprinkle in a bunch of > poll() calls and use ready() to make sure that they can all get buffered. > This is annoying because you cannot attach a callback to a request until you > are actually able to send it. This means you always have to send requests in > a poll loop, which implies you can't really have asynchronous requests. The > unsent request queue provides a way to buffer these requests before > transporting to the network layer which allows us to give a future to the > caller immediately. > > Now, when the user calls poll, we try to send all of the unsent requests > immediately (by doing the ready()/poll() dance until nothing else can be > sent). Note that none of this depends on the timeout passed to poll: we are > just calling poll(0) until nothing can be sent. After that, then we call > poll(timeout). It may be the case that there were some requests that failed > to be sent. This could be because the client is still connecting to the node > or that its send buffer is full. To make the behavior of this class easy to > understand, I just fail these requests which means that the unsent request > queue never accumulates beyond a poll call. So every request sent either gets > transported or fails. > > I think the call to Utils.sleep(retryBackoffMs) was lost in the refactor. > It should probably be invoked whenever we retry a request to the coordinator. > One potential issue is that the configuration "retry.backoff.ms" (as its > currently documented) is only intended for fetch requests, but we have been > using it for all requests. Perhaps there should be a separate configuration? > For example "coordinator.retry.backoff.ms"? Or perhaps a hard-coded value is > really what we want. Regarding the retry backoff timeout: in producer the RETRY_BACKOFF_MS_CONFIG is used for both produce and metadata requests. I suggest that for consumer we use the session.time to "guide" coordinator.retry.backoff, more concretely: 1. still use retry.backoff.ms for fetch / metadata / consumer-metadata requests. 2. currently the heartbeat interval is hard-coded as 1/3 of the session timeout, we can similarly hard-code the backoff of join-group / offset-commit / offset-fetch requests to be, say, 1/6 of the session timeout? > On July 9, 2015, 3:53 a.m., Ewen Cheslack-Postava wrote: > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java, > > line 314 > > <https://reviews.apache.org/r/36333/diff/1/?file=1002924#file1002924line314> > > > > Why trigger commit refresh here? > > Jason Gustafson wrote: > I copied this from older versions of the code, but I've wondered the > same. It does ensure that we will not reset any partitions to an older > position before the commit request returns. It also ensures that committed() > will not return before a pending asynchronous commit. However, in both of > those cases, I think the alternative behavior would be reasonable as well. In > the end, I kept it in there because I didn't have a strong case against it > and because it didn't seem like a bad thing to refresh commits from the > server from time to time. Now that I think about it a little more, it > actually makes asynchronous commits more costly than synchronous commits > since they always result in an additional FetchCommittedOffsets request. I feel it is OK either way for committed() / position() APIs, i.e. we could define "async commit does or does NOT guarantee subsequent committed() / position() calls to return the committed offsets even if it succeeded". However I think we should define "async commit DOES guarantee the subsequent poll() from a re-started / rebalanced consumer will start from the committed offsets if it suceeded". In other words, I think updateFetchPositions in pollOnce should check if there is a commit in progress, and if yes fetch the committed positions; while for committed() and position() it is really about how we want to define their semantics. - Guozhang ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/#review91016 ----------------------------------------------------------- On July 8, 2015, 9:19 p.m., Jason Gustafson wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/36333/ > ----------------------------------------------------------- > > (Updated July 8, 2015, 9:19 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-2123 > https://issues.apache.org/jira/browse/KAFKA-2123 > > > Repository: kafka > > > Description > ------- > > KAFKA-2123; resolve problems from rebase > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java > fd98740bff175cc9d5bc02e365d88e011ef65d22 > > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java > eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java > 7aa076084c894bb8f47b9df2c086475b06f47060 > clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java > 46e26a665a22625d50888efa7b53472279f36e79 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java > c1c8172cd45f6715262f9a6f497a7b1797a834a3 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java > 695eaf63db9a5fa20dc2ca68957901462a96cd96 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java > 51eae1944d5c17cf838be57adf560bafe36fbfbd > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java > 13fc9af7392b4ade958daf3b0c9a165ddda351a6 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java > 683745304c671952ff566f23b5dd4cf3ab75377a > > clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/common/protocol/Errors.java > 4c0ecc3badd99727b5bd9d430364e61c184e0923 > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java > PRE-CREATION > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java > d085fe5c9e2a0567893508a1c71f014fae6d7510 > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java > PRE-CREATION > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java > 405efdc7a59438731cbc3630876bda0042a3adb3 > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java > ee1ede01efa070409b86f5d8874cd578e058ce51 > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java > PRE-CREATION > core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala > 476973b2c551db5be3f1c54f94990f0dd15ff65e > core/src/test/scala/integration/kafka/api/ConsumerTest.scala > 92ffb91b5e039dc0d4cd0e072ca46db32f280cf9 > > Diff: https://reviews.apache.org/r/36333/diff/ > > > Testing > ------- > > > Thanks, > > Jason Gustafson > >